diff --git a/org.eclipse.lsp4e/src/org/eclipse/lsp4e/LaunchConfigurationStreamProvider.java b/org.eclipse.lsp4e/src/org/eclipse/lsp4e/LaunchConfigurationStreamProvider.java index e21014d2e..3a0725982 100644 --- a/org.eclipse.lsp4e/src/org/eclipse/lsp4e/LaunchConfigurationStreamProvider.java +++ b/org.eclipse.lsp4e/src/org/eclipse/lsp4e/LaunchConfigurationStreamProvider.java @@ -8,6 +8,7 @@ * * Contributors: * Mickael Istria (Red Hat Inc.) - initial implementation + * Sebastian Thomschke - re-implement StreamProxyInputStream for better performance *******************************************************************************/ package org.eclipse.lsp4e; @@ -17,10 +18,8 @@ import java.io.InputStream; import java.io.OutputStream; import java.lang.reflect.Method; -import java.nio.charset.Charset; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; @@ -45,8 +44,8 @@ import org.eclipse.lsp4e.server.StreamConnectionProvider; /** - * Access and control IO streams from a Launch Configuration to connect - * them to language server protocol client. + * Access and control IO streams from a Launch Configuration to connect them to + * language server protocol client. */ public class LaunchConfigurationStreamProvider implements StreamConnectionProvider, IAdaptable { @@ -60,43 +59,76 @@ public class LaunchConfigurationStreamProvider implements StreamConnectionProvid protected static class StreamProxyInputStream extends InputStream implements IStreamListener { - private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + private static final int EOF = -1; + private static final byte[] NO_DATA = new byte[0]; + + private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); private final IProcess process; + private byte[] currentBuffer = NO_DATA; + private int currentBufferPos = 0; public StreamProxyInputStream(IProcess process) { this.process = process; } @Override - public void streamAppended(String text, IStreamMonitor monitor) { - byte[] bytes = text.getBytes(Charset.defaultCharset()); - List bytesAsList = new ArrayList<>(bytes.length); - for (byte b : bytes) { - bytesAsList.add(b); + public void streamAppended(final String text, final IStreamMonitor monitor) { + final byte[] bytes = text.getBytes(); + if (bytes.length > 0) { + queue.offer(bytes); } - queue.addAll(bytesAsList); } @Override public int read() throws IOException { - while (queue.isEmpty()) { - if (this.process.isTerminated()) { - return -1; - } - try { - Thread.sleep(5, 0); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + if (currentBufferPos >= currentBuffer.length && !fillCurrentBuffer()) { + return EOF; + } + return currentBuffer[currentBufferPos++] & 0xFF; + } + + @Override + public int read(final byte[] buf, final int off, final int len) throws IOException { + Objects.checkFromIndexSize(off, len, buf.length); + if (len == 0) { + return 0; + } + + int totalBytesRead = 0; + while (totalBytesRead < len) { + if (currentBufferPos >= currentBuffer.length && !fillCurrentBuffer()) { + return totalBytesRead == 0 ? EOF : totalBytesRead; } + + final int bytesToRead = Math.min(len - totalBytesRead, currentBuffer.length - currentBufferPos); + System.arraycopy(currentBuffer, currentBufferPos, buf, off + totalBytesRead, bytesToRead); + currentBufferPos += bytesToRead; + totalBytesRead += bytesToRead; } - return castNonNull(queue.poll()); + return totalBytesRead; } @Override public int available() throws IOException { - return queue.size(); + return (currentBuffer.length - currentBufferPos) + queue.stream().mapToInt(arr -> arr.length).sum(); } + private boolean fillCurrentBuffer() throws IOException { + try { + while (queue.isEmpty()) { + if (process.isTerminated()) { + return false; + } + Thread.sleep(5); + } + currentBuffer = queue.remove(); + currentBufferPos = 0; + return currentBuffer.length > 0; + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IOException("Thread interrupted while reading.", ex); //$NON-NLS-1$ + } + } } public LaunchConfigurationStreamProvider(ILaunchConfiguration launchConfig, @Nullable Set launchModes) { @@ -116,8 +148,8 @@ public boolean equals(@Nullable Object obj) { return true; } return obj instanceof LaunchConfigurationStreamProvider other && // - this.launchConfiguration.equals(other.launchConfiguration) && // - this.launchModes.equals(other.launchModes); + this.launchConfiguration.equals(other.launchConfiguration) && // + this.launchModes.equals(other.launchModes); } @Override @@ -147,8 +179,8 @@ public void start() throws IOException { // the IDE when Outline is displayed. boolean statusHandlerToUpdate = disableStatusHandler(); try { - final var launch = this.launch = this.launchConfiguration.launch(this.launchModes.iterator().next(), new NullProgressMonitor(), - false); + final var launch = this.launch = this.launchConfiguration.launch(this.launchModes.iterator().next(), + new NullProgressMonitor(), false); long initialTimestamp = System.currentTimeMillis(); while (launch.getProcesses().length == 0 && System.currentTimeMillis() - initialTimestamp < 5000) { try { @@ -162,7 +194,7 @@ public void start() throws IOException { final var process = this.process = launch.getProcesses()[0]; final var inputStream = this.inputStream = new StreamProxyInputStream(process); final var proxy = process.getStreamsProxy(); - if(proxy != null) { + if (proxy != null) { final var mon = proxy.getOutputStreamMonitor(); if (mon != null) { mon.addListener(inputStream); @@ -178,7 +210,7 @@ public void start() throws IOException { LanguageServerPlugin.logError(ex); } final var errorStream = this.errorStream = new StreamProxyInputStream(process); - if(proxy != null) { + if (proxy != null) { final var mon = proxy.getErrorStreamMonitor(); if (mon != null) { mon.addListener(errorStream); @@ -187,8 +219,7 @@ public void start() throws IOException { } } catch (Exception e) { LanguageServerPlugin.logError(e); - } - finally { + } finally { if (statusHandlerToUpdate) { setStatusHandler(true); } @@ -223,7 +254,7 @@ private void setStatusHandler(boolean enabled) { @Override public @Nullable T getAdapter(@Nullable Class adapter) { - if(adapter == ProcessHandle.class && process != null) { + if (adapter == ProcessHandle.class && process != null) { return process.getAdapter(adapter); } return null;