diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java index 777451baca7a9..5ecab6f69d429 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java @@ -183,7 +183,6 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsFifty() throws Exception "Finished analysis"); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/49680") public void testStopAndRestart() throws Exception { initialize("regression_stop_and_restart"); indexData(sourceIndex, 350, 0); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java index 62c060de03a50..33c2dc5ac6ef6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java @@ -30,6 +30,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; /** @@ -44,6 +45,9 @@ public abstract class AbstractNativeProcess implements NativeProcess { private final String jobId; private final CppLogMessageHandler cppLogHandler; private final OutputStream processInStream; + // We need this as in Java 8 closing {@link FilterOutputStream} is not idempotent (i.e. cannot be performed twice). + // For more details regarding the underlying issue see https://bugs.openjdk.java.net/browse/JDK-8054565 + private final AtomicBoolean processInStreamClosed = new AtomicBoolean(); private final InputStream processOutStream; private final OutputStream processRestoreStream; private final LengthEncodedWriter recordWriter; @@ -163,7 +167,10 @@ public void close() throws IOException { processCloseInitiated = true; // closing its input causes the process to exit if (processInStream != null) { - processInStream.close(); + // Make sure {@code processInStream.close()} is called at most once. + if (processInStreamClosed.compareAndSet(false, true)) { + processInStream.close(); + } } // wait for the process to exit by waiting for end-of-file on the named pipe connected // to the state processor - it may take a long time for all the model state to be @@ -209,7 +216,10 @@ public void kill() throws IOException { } finally { try { if (processInStream != null) { - processInStream.close(); + // Make sure {@code processInStream.close()} is called at most once. + if (processInStreamClosed.compareAndSet(false, true)) { + processInStream.close(); + } } } catch (IOException e) { // Ignore it - we're shutting down and the method itself has logged a warning