Skip to content

Commit

Permalink
perf: improve StreamProxyInputStream
Browse files Browse the repository at this point in the history
  • Loading branch information
sebthom authored and mickaelistria committed Aug 17, 2024
1 parent 088cf9c commit 1d6a5f5
Showing 1 changed file with 62 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*
* Contributors:
* Mickael Istria (Red Hat Inc.) - initial implementation
* Sebastian Thomschke - re-implement StreamProxyInputStream for better performance
*******************************************************************************/
package org.eclipse.lsp4e;

Expand All @@ -17,10 +18,8 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;

Expand All @@ -45,8 +44,8 @@
import org.eclipse.lsp4e.server.StreamConnectionProvider;

/**
* Access and control IO streams from a Launch Configuration to connect
* them to language server protocol client.
* Access and control IO streams from a Launch Configuration to connect them to
* language server protocol client.
*/
public class LaunchConfigurationStreamProvider implements StreamConnectionProvider, IAdaptable {

Expand All @@ -60,43 +59,76 @@ public class LaunchConfigurationStreamProvider implements StreamConnectionProvid

protected static class StreamProxyInputStream extends InputStream implements IStreamListener {

private final ConcurrentLinkedQueue<Byte> queue = new ConcurrentLinkedQueue<>();
private static final int EOF = -1;
private static final byte[] NO_DATA = new byte[0];

private final ConcurrentLinkedQueue<byte[]> queue = new ConcurrentLinkedQueue<>();
private final IProcess process;
private byte[] currentBuffer = NO_DATA;
private int currentBufferPos = 0;

public StreamProxyInputStream(IProcess process) {
this.process = process;
}

@Override
public void streamAppended(String text, IStreamMonitor monitor) {
byte[] bytes = text.getBytes(Charset.defaultCharset());
List<Byte> bytesAsList = new ArrayList<>(bytes.length);
for (byte b : bytes) {
bytesAsList.add(b);
public void streamAppended(final String text, final IStreamMonitor monitor) {
final byte[] bytes = text.getBytes();
if (bytes.length > 0) {
queue.offer(bytes);
}
queue.addAll(bytesAsList);
}

@Override
public int read() throws IOException {
while (queue.isEmpty()) {
if (this.process.isTerminated()) {
return -1;
}
try {
Thread.sleep(5, 0);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (currentBufferPos >= currentBuffer.length && !fillCurrentBuffer()) {
return EOF;
}
return currentBuffer[currentBufferPos++] & 0xFF;
}

@Override
public int read(final byte[] buf, final int off, final int len) throws IOException {
Objects.checkFromIndexSize(off, len, buf.length);
if (len == 0) {
return 0;
}

int totalBytesRead = 0;
while (totalBytesRead < len) {
if (currentBufferPos >= currentBuffer.length && !fillCurrentBuffer()) {
return totalBytesRead == 0 ? EOF : totalBytesRead;
}

final int bytesToRead = Math.min(len - totalBytesRead, currentBuffer.length - currentBufferPos);
System.arraycopy(currentBuffer, currentBufferPos, buf, off + totalBytesRead, bytesToRead);
currentBufferPos += bytesToRead;
totalBytesRead += bytesToRead;
}
return castNonNull(queue.poll());
return totalBytesRead;
}

@Override
public int available() throws IOException {
return queue.size();
return (currentBuffer.length - currentBufferPos) + queue.stream().mapToInt(arr -> arr.length).sum();
}

private boolean fillCurrentBuffer() throws IOException {
try {
while (queue.isEmpty()) {
if (process.isTerminated()) {
return false;
}
Thread.sleep(5);
}
currentBuffer = queue.remove();
currentBufferPos = 0;
return currentBuffer.length > 0;
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IOException("Thread interrupted while reading.", ex); //$NON-NLS-1$
}
}
}

public LaunchConfigurationStreamProvider(ILaunchConfiguration launchConfig, @Nullable Set<String> launchModes) {
Expand All @@ -116,8 +148,8 @@ public boolean equals(@Nullable Object obj) {
return true;
}
return obj instanceof LaunchConfigurationStreamProvider other && //
this.launchConfiguration.equals(other.launchConfiguration) && //
this.launchModes.equals(other.launchModes);
this.launchConfiguration.equals(other.launchConfiguration) && //
this.launchModes.equals(other.launchModes);
}

@Override
Expand Down Expand Up @@ -147,8 +179,8 @@ public void start() throws IOException {
// the IDE when Outline is displayed.
boolean statusHandlerToUpdate = disableStatusHandler();
try {
final var launch = this.launch = this.launchConfiguration.launch(this.launchModes.iterator().next(), new NullProgressMonitor(),
false);
final var launch = this.launch = this.launchConfiguration.launch(this.launchModes.iterator().next(),
new NullProgressMonitor(), false);
long initialTimestamp = System.currentTimeMillis();
while (launch.getProcesses().length == 0 && System.currentTimeMillis() - initialTimestamp < 5000) {
try {
Expand All @@ -162,7 +194,7 @@ public void start() throws IOException {
final var process = this.process = launch.getProcesses()[0];
final var inputStream = this.inputStream = new StreamProxyInputStream(process);
final var proxy = process.getStreamsProxy();
if(proxy != null) {
if (proxy != null) {
final var mon = proxy.getOutputStreamMonitor();
if (mon != null) {
mon.addListener(inputStream);
Expand All @@ -178,7 +210,7 @@ public void start() throws IOException {
LanguageServerPlugin.logError(ex);
}
final var errorStream = this.errorStream = new StreamProxyInputStream(process);
if(proxy != null) {
if (proxy != null) {
final var mon = proxy.getErrorStreamMonitor();
if (mon != null) {
mon.addListener(errorStream);
Expand All @@ -187,8 +219,7 @@ public void start() throws IOException {
}
} catch (Exception e) {
LanguageServerPlugin.logError(e);
}
finally {
} finally {
if (statusHandlerToUpdate) {
setStatusHandler(true);
}
Expand Down Expand Up @@ -223,7 +254,7 @@ private void setStatusHandler(boolean enabled) {

@Override
public <T> @Nullable T getAdapter(@Nullable Class<T> adapter) {
if(adapter == ProcessHandle.class && process != null) {
if (adapter == ProcessHandle.class && process != null) {
return process.getAdapter(adapter);
}
return null;
Expand Down

0 comments on commit 1d6a5f5

Please sign in to comment.