Skip to content

Commit

Permalink
[HUDI-1358] Fix Memory Leak in HoodieLogFormatWriter (#2217)
Browse files Browse the repository at this point in the history
  • Loading branch information
bvaradar authored Nov 10, 2020
1 parent 0364498 commit 42b6aec
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public class SparkHoodieHBaseIndex<T extends HoodieRecordPayload> extends SparkH
private int maxQpsPerRegionServer;
private long totalNumInserts;
private int numWriteStatusWithInserts;
private static transient Thread shutdownThread;

/**
* multiPutBatchSize will be computed and re-set in updateLocation if
Expand Down Expand Up @@ -155,13 +156,16 @@ private Connection getHBaseConnection() {
* exits.
*/
private void addShutDownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
hbaseConnection.close();
} catch (Exception e) {
// fail silently for any sort of exception
}
}));
if (null == shutdownThread) {
shutdownThread = new Thread(() -> {
try {
hbaseConnection.close();
} catch (Exception e) {
// fail silently for any sort of exception
}
});
Runtime.getRuntime().addShutdownHook(shutdownThread);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
private long lastReverseLogFilePosition;
private boolean reverseReader;
private boolean closed = false;
private transient Thread shutdownThread = null;

public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader) throws IOException {
Expand Down Expand Up @@ -108,14 +109,15 @@ public HoodieLogFile getLogFile() {
* Close the inputstream if not closed when the JVM exits.
*/
private void addShutDownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
shutdownThread = new Thread(() -> {
try {
close();
} catch (Exception e) {
LOG.warn("unable to close input stream for log file " + logFile, e);
// fail silently for any sort of exception
}
}));
});
Runtime.getRuntime().addShutdownHook(shutdownThread);
}

// TODO : convert content and block length to long by using ByteBuffer, raw byte [] allows
Expand Down Expand Up @@ -291,6 +293,9 @@ private long scanForNextAvailableBlockOffset() throws IOException {
public void close() throws IOException {
if (!closed) {
this.inputStream.close();
if (null != shutdownThread) {
Runtime.getRuntime().removeShutdownHook(shutdownThread);
}
closed = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
private final String rolloverLogWriteToken;
private FSDataOutputStream output;
private boolean closed = false;
private transient Thread shutdownThread = null;

private static final String APPEND_UNAVAILABLE_EXCEPTION_MESSAGE = "not sufficiently replicated yet";

/**
Expand Down Expand Up @@ -222,6 +224,13 @@ private void createNewFile() throws IOException {

@Override
public void close() throws IOException {
if (null != shutdownThread) {
Runtime.getRuntime().removeShutdownHook(shutdownThread);
}
closeStream();
}

private void closeStream() throws IOException {
if (output != null) {
flush();
output.close();
Expand Down Expand Up @@ -256,7 +265,7 @@ public long getCurrentSize() throws IOException {
* Close the output stream when the JVM exits.
*/
private void addShutDownHook() {
Runtime.getRuntime().addShutdownHook(new Thread() {
shutdownThread = new Thread() {
public void run() {
try {
if (output != null) {
Expand All @@ -267,7 +276,8 @@ public void run() {
// fail silently for any sort of exception
}
}
});
};
Runtime.getRuntime().addShutdownHook(shutdownThread);
}

private void handleAppendExceptionOrRecoverLease(Path path, RemoteException e)
Expand Down

0 comments on commit 42b6aec

Please sign in to comment.