Skip to content

Commit

Permalink
TEZ-4522: Use OpenFile where FileStatus is available. (#318) (Ayush S…
Browse files Browse the repository at this point in the history
…axena reviewed by Laszlo Bodor)
  • Loading branch information
ayushtkn authored Dec 4, 2023
1 parent 0ed5e32 commit 0c5cf68
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
Expand Down Expand Up @@ -293,9 +294,10 @@ public static Path getSummaryRecoveryPath(Path attemptRecoverPath) {
public static void mkDirForAM(FileSystem fs, Path dir) throws IOException {
FsPermission perm = new FsPermission(TEZ_AM_DIR_PERMISSION);
fs.mkdirs(dir, perm);
if (!fs.getFileStatus(dir).getPermission().equals(perm)) {
FileStatus fileStatus = fs.getFileStatus(dir);
if (!fileStatus.getPermission().equals(perm)) {
LOG.warn("Directory " + dir.toString() + " created with unexpected permissions : "
+ fs.getFileStatus(dir).getPermission() + ". Fixing permissions to correct value : "
+ fileStatus.getPermission() + ". Fixing permissions to correct value : "
+ perm.toString());
fs.setPermission(dir, perm);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.functional.FutureIO;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.common.TezCommonUtils;
Expand Down Expand Up @@ -430,12 +431,11 @@ private Path getSummaryPath(Path attemptRecoveryDataDir) {
return TezCommonUtils.getSummaryRecoveryPath(attemptRecoveryDataDir);
}

private FSDataInputStream getSummaryStream(Path summaryPath) throws IOException {
private FSDataInputStream getSummaryStream(Path summaryPath, FileStatus summaryFileStatus) throws IOException {
try {
return recoveryFS.open(summaryPath, recoveryBufferSize);
return FutureIO.awaitFuture(recoveryFS.openFile(summaryPath).withFileStatus(summaryFileStatus).build());
} catch (FileNotFoundException fnf) {
return null;

}
}

Expand Down Expand Up @@ -667,7 +667,7 @@ public DAGRecoveryData parseRecoveryData() throws IOException {
+ ", len=" + summaryFileStatus.getLen()
+ ", lastModTime=" + summaryFileStatus.getModificationTime());
FSDataInputStream summaryStream = getSummaryStream(
summaryFile);
summaryFile, summaryFileStatus);
while (true) {
RecoveryProtos.SummaryEventProto proto;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.util.functional.FutureIO;
import org.apache.tez.common.MRFrameworkConfigs;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;

Expand Down Expand Up @@ -78,7 +79,7 @@ private static FSDataInputStream getFSDataIS(Configuration conf,
throw new IOException("Split metadata size exceeded " + maxMetaInfoSize
+ ". Aborting job ");
}
in = fs.open(metaSplitFile);
in = FutureIO.awaitFuture(fs.openFile(metaSplitFile).withFileStatus(fStatus).build());
byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length];
in.readFully(header);
if (!Arrays.equals(JobSplit.META_SPLIT_FILE_HEADER, header)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.PureJavaCrc32;
import org.apache.hadoop.util.functional.FutureIO;
import org.apache.tez.runtime.library.common.Constants;

public class TezSpillRecord {
Expand Down Expand Up @@ -66,11 +68,10 @@ public TezSpillRecord(Path indexFileName, FileSystem rfs, Checksum crc,
String expectedIndexOwner)
throws IOException {

final FSDataInputStream in = rfs.open(indexFileName);
try {
final long length = rfs.getFileStatus(indexFileName).getLen();
final int partitions =
(int) length / Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
FileStatus fileStatus = rfs.getFileStatus(indexFileName);
final long length = fileStatus.getLen();
try (FSDataInputStream in = FutureIO.awaitFuture(rfs.openFile(indexFileName).withFileStatus(fileStatus).build())) {
final int partitions = (int) length / Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
final int size = partitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;

buf = ByteBuffer.allocate(size);
Expand All @@ -79,15 +80,12 @@ public TezSpillRecord(Path indexFileName, FileSystem rfs, Checksum crc,
CheckedInputStream chk = new CheckedInputStream(in, crc);
IOUtils.readFully(chk, buf.array(), 0, size);
if (chk.getChecksum().getValue() != in.readLong()) {
throw new ChecksumException("Checksum error reading spill index: " +
indexFileName, -1);
throw new ChecksumException("Checksum error reading spill index: " + indexFileName, -1);
}
} else {
IOUtils.readFully(in, buf.array(), 0, size);
}
entries = buf.asLongBuffer();
} finally {
in.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
Expand All @@ -32,6 +33,7 @@
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.functional.FutureIO;

import java.io.BufferedReader;
import java.io.EOFException;
Expand Down Expand Up @@ -74,9 +76,9 @@ public class TFileRecordReader extends RecordReader<Text, Text> {

FileSystem fs = fileSplit.getPath().getFileSystem(context.getConfiguration());
splitPath = fileSplit.getPath();
fin = fs.open(splitPath);
reader = new TFile.Reader(fin, fs.getFileStatus(splitPath).getLen(),
context.getConfiguration());
FileStatus fileStatus = fs.getFileStatus(splitPath);
fin = FutureIO.awaitFuture(fs.openFile(splitPath).withFileStatus(fileStatus).build());
reader = new TFile.Reader(fin, fileStatus.getLen(), context.getConfiguration());
scanner = reader.createScannerByByteRange(start, fileSplit.getLength());
}

Expand Down

0 comments on commit 0c5cf68

Please sign in to comment.