Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReadableByteChannelPublisher executor leak #1924

Merged
7 changes: 6 additions & 1 deletion common/common/src/main/java/io/helidon/common/LazyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
*
* @param <T> type of the provided object
*/
@FunctionalInterface
public interface LazyValue<T> extends Supplier<T> {
/**
* Create a lazy value from a supplier.
Expand All @@ -50,4 +49,10 @@ static <T> LazyValue<T> create(T value) {
return new LazyValueImpl<>(value);
}

/**
* Return true if the value is loaded, false if the supplier was not invoked.
*
* @return {@code true} if the value is loaded
*/
boolean isLoaded();
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ class LazyValueImpl<T> implements LazyValue<T> {
this.delegate = supplier;
}

@Override
public boolean isLoaded() {
return loaded;
}

@Override
public T get() {
if (loaded) {
Expand Down
211 changes: 201 additions & 10 deletions common/reactive/src/main/java/io/helidon/common/reactive/IoMulti.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,42 +20,78 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;

import io.helidon.common.Builder;
import io.helidon.common.LazyValue;

/**
* Create reactive stream from standard IO resources.
*/
public interface IoMulti {

/**
* Create a {@link Multi} publisher, to which is possible to publish
* as in to an {@link OutputStream}. In case there is no demand,
* Create an {@link java.io.OutputStream} that provides the data written
* as a {@link Multi}.
* <p>
* In case there is no demand,
* {@link OutputStream#write(byte[], int, int)} methods are blocked
* until downstream request for more data.
*
* @return new {@link Multi} publisher extending {@link OutputStream}
* @deprecated Please use {@link #outputStreamMulti()}
*/
static MultiFromOutputStream createOutputStream() {
return new MultiFromOutputStream();
@Deprecated(since = "2.0.0", forRemoval = true)
static OutputStreamMulti createOutputStream() {
return new OutputStreamMulti();
}

/**
* Creates a builder of the {@link Multi} publisher, to which is possible
* to publish as in to an {@link OutputStream}. In case there is no demand,
* Create an {@link java.io.OutputStream} that provides the data written
* as a {@link Multi}.
* <p>
* In case there is no demand,
* {@link OutputStream#write(byte[], int, int)} methods are blocked
* until downstream request for more data.
*
* @return new {@link OutputStream} implementing {@link Multi}
*/
static OutputStreamMulti outputStreamMulti() {
return new OutputStreamMulti();
}

/**
* Creates a builder of the {@link java.io.OutputStream} that provides data written
* as a {@link io.helidon.common.reactive.Multi}.
*
* @return the builder
* @see #outputStreamMulti()
* @deprecated Please use {@link #outputStreamMultiBuilder()}
*/
@Deprecated(since = "2.0.0", forRemoval = true)
static OutputStreamMultiBuilder builderOutputStream() {
return new OutputStreamMultiBuilder();
}

/**
* Creates a builder of the {@link java.io.OutputStream} that provides data written
* as a {@link io.helidon.common.reactive.Multi}.
*
* @return the builder
* @see #outputStreamMulti()
*/
static OutputStreamMultiBuilder outputStreamMultiBuilder() {
return new OutputStreamMultiBuilder();
}

/**
* Create a {@link Multi} instance that publishes {@link ByteBuffer}s from
* the given {@link InputStream}.
Expand All @@ -67,23 +103,175 @@ static OutputStreamMultiBuilder builderOutputStream() {
* @param inputStream the Stream to publish
* @return Multi
* @throws NullPointerException if {@code stream} is {@code null}
* @deprecated please use {@link #multiFromStream(java.io.InputStream)}
*/
@Deprecated(since = "2.0.0", forRemoval = true)
static Multi<ByteBuffer> createInputStream(final InputStream inputStream) {
return IoMulti.builderInputStream(inputStream)
.build();
}

/**
* Create a {@link Multi} instance that publishes {@link ByteBuffer}s from
* the given {@link InputStream}.
* <p>
* {@link InputStream} is trusted not to block on read operations, in case
* it can't be assured use builder to specify executor for asynchronous waiting
* for blocking reads. {@code IoMulti.builder(is).executor(executorService).build()}.
*
* @param inputStream the Stream to publish
* @return Multi
*/
static Multi<ByteBuffer> multiFromStream(final InputStream inputStream) {
return IoMulti.builderInputStream(inputStream)
.build();
}

/**
* Creates a builder of the {@link Multi} from supplied {@link java.io.InputStream}.
*
* @param inputStream the Stream to publish
* @return the builder
* @deprecated Please use {@link #multiFromStreamBuilder(java.io.InputStream)}
*/
@Deprecated(since = "2.0.0", forRemoval = true)
static MultiFromInputStreamBuilder builderInputStream(final InputStream inputStream) {
Objects.requireNonNull(inputStream);
return new MultiFromInputStreamBuilder(inputStream);
}

/**
* Creates a builder of the {@link Multi} from supplied {@link java.io.InputStream}.
*
* @param inputStream the Stream to publish
* @return the builder
*/
static MultiFromInputStreamBuilder multiFromStreamBuilder(final InputStream inputStream) {
Objects.requireNonNull(inputStream);
return new MultiFromInputStreamBuilder(inputStream);
}

/**
* Creates a multi that reads data from the provided byte channel.
* The multi uses an executor service to process asynchronous reads.
* You can provide a custom executor service using
* {@link #multiFromByteChannelBuilder(java.nio.channels.ReadableByteChannel)}.
*
* @param byteChannel readable byte channel with data
* @return publisher of data from the provided channel
*/
static Multi<ByteBuffer> multiFromByteChannel(ReadableByteChannel byteChannel) {
return multiFromByteChannelBuilder(byteChannel).build();
}

/**
* Creates a builder of {@link Multi} from provided {@link java.nio.channels.ReadableByteChannel}.
*
* @param byteChannel readable byte channel with data
* @return fluent API builder to configure additional details
*/
static MultiFromByteChannelBuilder multiFromByteChannelBuilder(ReadableByteChannel byteChannel) {
return new MultiFromByteChannelBuilder(Objects.requireNonNull(byteChannel));
}

/**
* Fluent API builder for creating a {@link io.helidon.common.reactive.Multi} from a
* {@link java.nio.channels.ReadableByteChannel}.
*/
final class MultiFromByteChannelBuilder implements Builder<Multi<ByteBuffer>> {
private static final int DEFAULT_BUFFER_CAPACITY = 1024 * 8;
private static final RetrySchema DEFAULT_RETRY_SCHEMA = RetrySchema.linear(0, 10, 250);
private static final String THREAD_PREFIX = "multi-rbc-";
private static final AtomicLong COUNTER = new AtomicLong();
private static final ThreadFactory THREAD_FACTORY = r -> new Thread(r, THREAD_PREFIX + COUNTER.incrementAndGet());

private final ReadableByteChannel theChannel;

private LazyValue<ScheduledExecutorService> executor = LazyValue
.create(() -> Executors.newScheduledThreadPool(1, THREAD_FACTORY));
private RetrySchema retrySchema = DEFAULT_RETRY_SCHEMA;
private int bufferCapacity = DEFAULT_BUFFER_CAPACITY;
private boolean externalExecutor;

private MultiFromByteChannelBuilder(ReadableByteChannel theChannel) {
this.theChannel = theChannel;
}

@Override
public Multi<ByteBuffer> build() {
return new MultiFromByteChannel(this);
}

/**
* Configure executor service to use for scheduling reads from the channel.
* If an executor is configured using this method, it will not be terminated when the publisher completes.
*
* @param executor to use for scheduling
* @return updated builder instance
*/
public MultiFromByteChannelBuilder executor(ScheduledExecutorService executor) {
Objects.requireNonNull(executor);

this.executor = LazyValue.create(executor);
this.externalExecutor = true;
return this;
}

/**
* Retry schema to use when reading from the channel.
* If a channel read fails (e.g. no data is read), the read is scheduled using
* {@link #executor} using the provided retry schema, to prolong the delays between retries.
* <p>
* By default the first delay is {@code 0} milliseconds, incrementing by {@code 50 milliseconds} up
* to {@code 250} milliseconds.
*
* @param retrySchema schema to use
* @return updated builder instance
*/
public MultiFromByteChannelBuilder retrySchema(RetrySchema retrySchema) {
Objects.requireNonNull(retrySchema);

this.retrySchema = retrySchema;
return this;
}

/**
* Capacity of byte buffer in number of bytes.
*
* @param bufferCapacity capacity of the buffer, defaults to 8 Kb
* @return updated builder instance
*/
public MultiFromByteChannelBuilder bufferCapacity(int bufferCapacity) {
this.bufferCapacity = bufferCapacity;
return this;
}

ReadableByteChannel theChannel() {
return theChannel;
}

LazyValue<ScheduledExecutorService> executor() {
return executor;
}

RetrySchema retrySchema() {
return retrySchema;
}

int bufferCapacity() {
return bufferCapacity;
}

// we need to know whether to shut the executor down
boolean isExternalExecutor() {
return externalExecutor;
}
}

/**
* Fluent API builder for creating a {@link io.helidon.common.reactive.Multi} from an
* {@link java.io.InputStream}.
*/
final class MultiFromInputStreamBuilder implements Builder<Multi<ByteBuffer>> {

private int bufferSize = 1024;
Expand Down Expand Up @@ -127,9 +315,12 @@ public Multi<ByteBuffer> build() {
}
}

final class OutputStreamMultiBuilder implements Builder<MultiFromOutputStream> {
final class OutputStreamMultiBuilder implements Builder<OutputStreamMulti> {

private final OutputStreamMulti streamMulti = new OutputStreamMulti();

private final MultiFromOutputStream streamMulti = new MultiFromOutputStream();
private OutputStreamMultiBuilder() {
}

/**
* Set max timeout for which is allowed to block write methods,
Expand All @@ -154,13 +345,13 @@ public OutputStreamMultiBuilder timeout(long timeout, TimeUnit unit) {
*
* @param requestCallback to be executed
*/
public OutputStreamMultiBuilder onRequest(BiConsumer<Long, Long> requestCallback){
public OutputStreamMultiBuilder onRequest(BiConsumer<Long, Long> requestCallback) {
streamMulti.onRequest(requestCallback);
return this;
}

@Override
public MultiFromOutputStream build() {
public OutputStreamMulti build() {
return streamMulti;
}
}
Expand Down
Loading