From 42b6aeca28664767add190374b13f57c99e17286 Mon Sep 17 00:00:00 2001 From: Balaji Varadarajan Date: Mon, 9 Nov 2020 19:26:13 -0800 Subject: [PATCH] [HUDI-1358] Fix Memory Leak in HoodieLogFormatWriter (#2217) --- .../index/hbase/SparkHoodieHBaseIndex.java | 18 +++++++++++------- .../common/table/log/HoodieLogFileReader.java | 9 +++++++-- .../table/log/HoodieLogFormatWriter.java | 14 ++++++++++++-- 3 files changed, 30 insertions(+), 11 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java index 77659b7251b1b..5b67f838509bd 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java @@ -97,6 +97,7 @@ public class SparkHoodieHBaseIndex extends SparkH private int maxQpsPerRegionServer; private long totalNumInserts; private int numWriteStatusWithInserts; + private static transient Thread shutdownThread; /** * multiPutBatchSize will be computed and re-set in updateLocation if @@ -155,13 +156,16 @@ private Connection getHBaseConnection() { * exits. */ private void addShutDownHook() { - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - hbaseConnection.close(); - } catch (Exception e) { - // fail silently for any sort of exception - } - })); + if (null == shutdownThread) { + shutdownThread = new Thread(() -> { + try { + hbaseConnection.close(); + } catch (Exception e) { + // fail silently for any sort of exception + } + }); + Runtime.getRuntime().addShutdownHook(shutdownThread); + } } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index 27884ec58e1aa..14d523ad9825e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -67,6 +67,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { private long lastReverseLogFilePosition; private boolean reverseReader; private boolean closed = false; + private transient Thread shutdownThread = null; public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean readBlockLazily, boolean reverseReader) throws IOException { @@ -108,14 +109,15 @@ public HoodieLogFile getLogFile() { * Close the inputstream if not closed when the JVM exits. */ private void addShutDownHook() { - Runtime.getRuntime().addShutdownHook(new Thread(() -> { + shutdownThread = new Thread(() -> { try { close(); } catch (Exception e) { LOG.warn("unable to close input stream for log file " + logFile, e); // fail silently for any sort of exception } - })); + }); + Runtime.getRuntime().addShutdownHook(shutdownThread); } // TODO : convert content and block length to long by using ByteBuffer, raw byte [] allows @@ -291,6 +293,9 @@ private long scanForNextAvailableBlockOffset() throws IOException { public void close() throws IOException { if (!closed) { this.inputStream.close(); + if (null != shutdownThread) { + Runtime.getRuntime().removeShutdownHook(shutdownThread); + } closed = true; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java index 8909477d90193..7fe21e9b2955d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java @@ -55,6 +55,8 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { private final String rolloverLogWriteToken; private FSDataOutputStream output; private boolean closed = false; + private transient Thread shutdownThread = null; + private static final String APPEND_UNAVAILABLE_EXCEPTION_MESSAGE = "not sufficiently replicated yet"; /** @@ -222,6 +224,13 @@ private void createNewFile() throws IOException { @Override public void close() throws IOException { + if (null != shutdownThread) { + Runtime.getRuntime().removeShutdownHook(shutdownThread); + } + closeStream(); + } + + private void closeStream() throws IOException { if (output != null) { flush(); output.close(); @@ -256,7 +265,7 @@ public long getCurrentSize() throws IOException { * Close the output stream when the JVM exits. */ private void addShutDownHook() { - Runtime.getRuntime().addShutdownHook(new Thread() { + shutdownThread = new Thread() { public void run() { try { if (output != null) { @@ -267,7 +276,8 @@ public void run() { // fail silently for any sort of exception } } - }); + }; + Runtime.getRuntime().addShutdownHook(shutdownThread); } private void handleAppendExceptionOrRecoverLease(Path path, RemoteException e)