From 8240b526addbaf97834198fc98e1c8302c13b0a4 Mon Sep 17 00:00:00 2001 From: Przemyslaw Witek Date: Fri, 3 Jan 2020 09:33:34 +0100 Subject: [PATCH 1/2] Synchronize processInStream.close() call --- .../xpack/ml/process/AbstractNativeProcess.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java index 62c060de03a50..33c2dc5ac6ef6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java @@ -30,6 +30,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; /** @@ -44,6 +45,9 @@ public abstract class AbstractNativeProcess implements NativeProcess { private final String jobId; private final CppLogMessageHandler cppLogHandler; private final OutputStream processInStream; + // We need this as in Java 8 closing {@link FilterOutputStream} is not idempotent (i.e. cannot be performed twice). + // For more details regarding the underlying issue see https://bugs.openjdk.java.net/browse/JDK-8054565 + private final AtomicBoolean processInStreamClosed = new AtomicBoolean(); private final InputStream processOutStream; private final OutputStream processRestoreStream; private final LengthEncodedWriter recordWriter; @@ -163,7 +167,10 @@ public void close() throws IOException { processCloseInitiated = true; // closing its input causes the process to exit if (processInStream != null) { - processInStream.close(); + // Make sure {@code processInStream.close()} is called at most once. + if (processInStreamClosed.compareAndSet(false, true)) { + processInStream.close(); + } } // wait for the process to exit by waiting for end-of-file on the named pipe connected // to the state processor - it may take a long time for all the model state to be @@ -209,7 +216,10 @@ public void kill() throws IOException { } finally { try { if (processInStream != null) { - processInStream.close(); + // Make sure {@code processInStream.close()} is called at most once. + if (processInStreamClosed.compareAndSet(false, true)) { + processInStream.close(); + } } } catch (IOException e) { // Ignore it - we're shutting down and the method itself has logged a warning From ad753d5a4aeff27819517b81f15b0c2ff33f7158 Mon Sep 17 00:00:00 2001 From: Przemyslaw Witek Date: Fri, 3 Jan 2020 09:38:49 +0100 Subject: [PATCH 2/2] Unmute testStopAndRestart --- .../org/elasticsearch/xpack/ml/integration/RegressionIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java index 777451baca7a9..5ecab6f69d429 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java @@ -183,7 +183,6 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsFifty() throws Exception "Finished analysis"); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/49680") public void testStopAndRestart() throws Exception { initialize("regression_stop_and_restart"); indexData(sourceIndex, 350, 0);