From 4d7a2aab7afa41b9322fbfaf860a166036058410 Mon Sep 17 00:00:00 2001 From: Daniel Kec Date: Tue, 12 May 2020 13:46:22 +0200 Subject: [PATCH 1/3] Multi from InputStream --- .../io/helidon/common/reactive/Multi.java | 35 ++++ .../common/reactive/MultiByteBuffer.java | 34 ++++ .../MultiFromBlockingInputStream.java | 102 ++++++++++++ .../common/reactive/MultiFromInputStream.java | 155 ++++++++++++++++++ .../reactive/MultiFromInputStreamTest.java | 123 ++++++++++++++ ...MultiFromNotTrustedInputStreamTckTest.java | 33 ++++ .../MultiFromTrustedInputStreamTckTest.java | 72 ++++++++ 7 files changed, 554 insertions(+) create mode 100644 common/reactive/src/main/java/io/helidon/common/reactive/MultiByteBuffer.java create mode 100644 common/reactive/src/main/java/io/helidon/common/reactive/MultiFromBlockingInputStream.java create mode 100644 common/reactive/src/main/java/io/helidon/common/reactive/MultiFromInputStream.java create mode 100644 common/reactive/src/test/java/io/helidon/common/reactive/MultiFromInputStreamTest.java create mode 100644 common/reactive/src/test/java/io/helidon/common/reactive/MultiFromNotTrustedInputStreamTckTest.java create mode 100644 common/reactive/src/test/java/io/helidon/common/reactive/MultiFromTrustedInputStreamTckTest.java diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/Multi.java b/common/reactive/src/main/java/io/helidon/common/reactive/Multi.java index f81f4d5e3c3..3bc52499070 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/Multi.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/Multi.java @@ -15,6 +15,8 @@ */ package io.helidon.common.reactive; +import java.io.InputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -22,6 +24,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Flow; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; @@ -230,6 +233,38 @@ static Multi from(Stream stream) { return new MultiFromStream<>(stream); } + /** + * Create a {@link Multi} instance that publishes {@link ByteBuffer}s from the given {@link InputStream}. + *

+ * If {@link InputStream} can be trusted not to block on read operations, use faster + * {@link Multi#from(java.io.InputStream)}. + * + * @param inputStream the Stream to publish + * @param executor executor to use for waiting at {@link InputStream}'s blocking reads + * @return Multi + * @throws NullPointerException if {@code stream} is {@code null} + */ + static MultiByteBuffer from(InputStream inputStream, ExecutorService executor) { + Objects.requireNonNull(inputStream, "stream is null"); + Objects.requireNonNull(executor, "executor is null"); + return new MultiFromBlockingInputStream(inputStream, 4, executor); + } + + /** + * Create a {@link Multi} instance that publishes {@link ByteBuffer}s from the given {@link InputStream}. + *

+ * {@link InputStream} is trusted not to block on read operations, in case it can't be assured use + * {@link Multi#from(java.io.InputStream, java.util.concurrent.ExecutorService)}. + * + * @param inputStream the Stream to publish + * @return Multi + * @throws NullPointerException if {@code stream} is {@code null} + */ + static MultiByteBuffer from(InputStream inputStream) { + Objects.requireNonNull(inputStream, "stream is null"); + return new MultiFromInputStream(inputStream, 4); + } + /** * Signal 0L, 1L and so on periodically to the downstream. *

diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/MultiByteBuffer.java b/common/reactive/src/main/java/io/helidon/common/reactive/MultiByteBuffer.java new file mode 100644 index 00000000000..5284c0539fc --- /dev/null +++ b/common/reactive/src/main/java/io/helidon/common/reactive/MultiByteBuffer.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.helidon.common.reactive; + +import java.nio.ByteBuffer; + +/** + * Multi publishing stream of {@link ByteBuffer}s. + */ +public interface MultiByteBuffer extends Multi { + + /** + * Set the size of {@link ByteBuffer}s being published. + * + * @param bufferSize size of the {@link ByteBuffer} + * @return Multi + */ + Multi withByteBufferSize(int bufferSize); +} diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/MultiFromBlockingInputStream.java b/common/reactive/src/main/java/io/helidon/common/reactive/MultiFromBlockingInputStream.java new file mode 100644 index 00000000000..359e7838c1f --- /dev/null +++ b/common/reactive/src/main/java/io/helidon/common/reactive/MultiFromBlockingInputStream.java @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.reactive; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Flow; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.IntSupplier; + +class MultiFromBlockingInputStream extends MultiFromInputStream { + + private final InputStream inputStream; + private IntSupplier bufferSizeSupplier; + private final ExecutorService executorService; + + MultiFromBlockingInputStream(InputStream inputStream, int bufferSize, ExecutorService executorService) { + super(inputStream, bufferSize); + this.inputStream = inputStream; + this.bufferSizeSupplier = () -> bufferSize; + this.executorService = executorService; + } + + @Override + public void subscribe(final Flow.Subscriber subscriber) { + Objects.requireNonNull(subscriber, "subscriber is null"); + try { + inputStream.available(); + } catch (IOException e) { + subscriber.onSubscribe(EmptySubscription.INSTANCE); + subscriber.onError(e); + return; + } + InputStreamSubscription subscription = new InputStreamSubscription( + subscriber, + inputStream, + bufferSizeSupplier.getAsInt(), + executorService); + subscriber.onSubscribe(subscription); + } + + @Override + public Multi withByteBufferSize(final int bufferSize) { + this.bufferSizeSupplier = () -> bufferSize; + return this; + } + + static final class InputStreamSubscription extends MultiFromInputStream.InputStreamSubscription { + + private final ExecutorService executorService; + private final LinkedBlockingQueue submitQueue = new LinkedBlockingQueue<>(); + + private final AtomicBoolean draining = new AtomicBoolean(false); + + InputStreamSubscription(Flow.Subscriber downstream, + InputStream inputStream, + final int bufferSize, + ExecutorService executorService) { + super(downstream, inputStream, bufferSize); + this.executorService = executorService; + } + + protected void trySubmit(long n) { + submitQueue.add(() -> { + submit(n); + drainSubmitQueue(); + }); + drainSubmitQueue(); + } + + private void drainSubmitQueue() { + if (!draining.getAndSet(true)) { + try { + Runnable job = submitQueue.poll(); + if (job != null) { + executorService.submit(job); + } + } finally { + draining.set(false); + } + } + } + } +} diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/MultiFromInputStream.java b/common/reactive/src/main/java/io/helidon/common/reactive/MultiFromInputStream.java new file mode 100644 index 00000000000..b1222c862d0 --- /dev/null +++ b/common/reactive/src/main/java/io/helidon/common/reactive/MultiFromInputStream.java @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.reactive; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Objects; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.IntSupplier; + +class MultiFromInputStream implements MultiByteBuffer { + + private final InputStream inputStream; + private IntSupplier bufferSizeSupplier; + + MultiFromInputStream(InputStream inputStream, int bufferSize) { + this.inputStream = inputStream; + this.bufferSizeSupplier = () -> bufferSize; + } + + @Override + public void subscribe(final Flow.Subscriber subscriber) { + Objects.requireNonNull(subscriber, "subscriber is null"); + try { + inputStream.available(); + } catch (IOException e) { + subscriber.onSubscribe(EmptySubscription.INSTANCE); + subscriber.onError(e); + return; + } + InputStreamSubscription subscription = new InputStreamSubscription( + subscriber, + inputStream, + bufferSizeSupplier.getAsInt()); + subscriber.onSubscribe(subscription); + } + + public Multi withByteBufferSize(final int bufferSize) { + this.bufferSizeSupplier = () -> bufferSize; + return this; + } + + static class InputStreamSubscription extends AtomicLong implements Flow.Subscription { + + private final Flow.Subscriber downstream; + private final int bufferSize; + private InputStream inputStream; + + private volatile int canceled; + + static final int NORMAL_CANCEL = 1; + static final int BAD_REQUEST = 2; + + InputStreamSubscription(Flow.Subscriber downstream, + InputStream inputStream, + final int bufferSize) { + this.downstream = downstream; + this.inputStream = inputStream; + this.bufferSize = bufferSize; + } + + protected void submit(long n) { + long emitted = 0L; + Flow.Subscriber downstream = this.downstream; + + for (;;) { + while (emitted != n) { + int isCanceled = canceled; + if (isCanceled != 0) { + inputStream = null; + if (isCanceled == BAD_REQUEST) { + downstream.onError(new IllegalArgumentException( + "Rule ยง3.9 violated: non-positive request amount is forbidden")); + } + return; + } + + ByteBuffer value; + + try { + value = ByteBuffer.wrap(inputStream.readNBytes(bufferSize)); + } catch (Throwable ex) { + inputStream = null; + canceled = NORMAL_CANCEL; + downstream.onError(ex); + return; + } + + if (value.limit() == 0) { + inputStream = null; + downstream.onComplete(); + return; + } + + downstream.onNext(value); + + if (canceled != 0) { + continue; + } + + emitted++; + } + + n = get(); + if (n == emitted) { + n = SubscriptionHelper.produced(this, emitted); + if (n == 0L) { + return; + } + emitted = 0L; + } + } + } + + @Override + public void request(long n) { + if (n <= 0L) { + canceled = BAD_REQUEST; + n = 1; // for cleanup + } + + if (SubscriptionHelper.addRequest(this, n) != 0L) { + return; + } + + trySubmit(n); + } + + protected void trySubmit(long n) { + submit(n); + } + + @Override + public void cancel() { + canceled = NORMAL_CANCEL; + request(1); // for cleanup + } + } +} diff --git a/common/reactive/src/test/java/io/helidon/common/reactive/MultiFromInputStreamTest.java b/common/reactive/src/test/java/io/helidon/common/reactive/MultiFromInputStreamTest.java new file mode 100644 index 00000000000..2a2532c9def --- /dev/null +++ b/common/reactive/src/test/java/io/helidon/common/reactive/MultiFromInputStreamTest.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.common.reactive; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.RepetitionInfo; +import org.junit.jupiter.api.Test; + +public class MultiFromInputStreamTest { + + private static ExecutorService executorService; + + @BeforeAll + static void beforeAll() { + executorService = Executors.newFixedThreadPool(4); + } + + @AfterAll + static void afterAll() { + executorService.shutdown(); + } + + @Test + public void testInputStream() { + byte[] initialArray = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 6, 4, 5, 78, 55, 44}; + + InputStream is = new ByteArrayInputStream(initialArray); + + List result = Multi.from(is) + .flatMapIterable((ByteBuffer b) -> { + List list = new LinkedList<>(); + while (b.remaining() > 0) { + list.add(b.get()); + } + return list; + }) + .collectList() + .await(100, TimeUnit.MILLISECONDS); + assertThat(result, equalTo(toList(initialArray))); + } + + @RepeatedTest(value = 20, name = "buffer size {currentRepetition}") + void longStringTrustedStream(RepetitionInfo repetitionInfo) { + var bufferSize = repetitionInfo.getCurrentRepetition(); + longString(is -> Multi.from(is) + .withByteBufferSize(bufferSize)); + } + + @RepeatedTest(value = 20, name = "buffer size {currentRepetition}") + void longStringNotTrustedStream(RepetitionInfo repetitionInfo) { + var bufferSize = repetitionInfo.getCurrentRepetition(); + longString(is -> Multi.from(is, executorService) + .withByteBufferSize(bufferSize)); + } + + private void longString(Function> pubCreator) { + final var STRING_LENGTH = 200_000; + final var token = "Lorem ipsum ".toCharArray(); + final var sb = new StringBuilder(); + + byte token_index = 0; + for (int i = 0; i < STRING_LENGTH; i++) { + sb.append(token[token_index++]); + if (token_index == token.length) { + token_index = 0; + } + } + + final var expected = sb.toString(); + + InputStream is = new ByteArrayInputStream(expected.getBytes(StandardCharsets.UTF_8)); + + String result = pubCreator.apply(is) + .map(StandardCharsets.UTF_8::decode) + .map(CharBuffer::toString) + .map(CharSequence.class::cast) + .collectStream(Collectors.joining()) + .await(100, TimeUnit.SECONDS); + + assertThat(result, equalTo(expected)); + } + + private static List toList(byte[] array) { + List result = new ArrayList<>(array.length); + for (byte b : array) { + result.add(b); + } + return result; + } +} diff --git a/common/reactive/src/test/java/io/helidon/common/reactive/MultiFromNotTrustedInputStreamTckTest.java b/common/reactive/src/test/java/io/helidon/common/reactive/MultiFromNotTrustedInputStreamTckTest.java new file mode 100644 index 00000000000..ebc0d43fda1 --- /dev/null +++ b/common/reactive/src/test/java/io/helidon/common/reactive/MultiFromNotTrustedInputStreamTckTest.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.helidon.common.reactive; + +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.Executors; + +import org.testng.annotations.Test; + +@Test +public class MultiFromNotTrustedInputStreamTckTest extends MultiFromTrustedInputStreamTckTest { + static final int BUFFER_SIZE = 4; + + protected Multi getPublisher(InputStream is) { + return Multi.from(is, Executors.newFixedThreadPool(4)).withByteBufferSize(BUFFER_SIZE); + } + +} diff --git a/common/reactive/src/test/java/io/helidon/common/reactive/MultiFromTrustedInputStreamTckTest.java b/common/reactive/src/test/java/io/helidon/common/reactive/MultiFromTrustedInputStreamTckTest.java new file mode 100644 index 00000000000..6a297be3d22 --- /dev/null +++ b/common/reactive/src/test/java/io/helidon/common/reactive/MultiFromTrustedInputStreamTckTest.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.helidon.common.reactive; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicLong; + +import org.reactivestreams.tck.TestEnvironment; +import org.reactivestreams.tck.flow.FlowPublisherVerification; +import org.testng.annotations.Test; + +@Test +public class MultiFromTrustedInputStreamTckTest extends FlowPublisherVerification { + static final int BUFFER_SIZE = 4; + + public MultiFromTrustedInputStreamTckTest() { + super(new TestEnvironment(50)); + } + + @Override + public Flow.Publisher createFlowPublisher(long l) { + AtomicLong remaining = new AtomicLong(l * BUFFER_SIZE); + final byte[] theByte = {0}; + InputStream is = new InputStream() { + @Override + public int read() { + if (0 == remaining.getAndDecrement()) { + return -1; + } + return theByte[0]++; + } + }; + return getPublisher(is); + } + + protected Multi getPublisher(InputStream is) { + return Multi.from(is).withByteBufferSize(BUFFER_SIZE); + } + + @Override + public Flow.Publisher createFailedFlowPublisher() { + InputStream is = new InputStream() { + @Override + public int read() throws IOException { + throw new IOException("BOOM!!!"); + } + + @Override + public int available() throws IOException { + throw new IOException("BOOM!!!"); + } + }; + return getPublisher(is); + } +} From 864d7bb5d72f573f67d2729a60688f555b100f31 Mon Sep 17 00:00:00 2001 From: Daniel Kec Date: Thu, 28 May 2020 15:42:45 +0200 Subject: [PATCH 2/3] Switch to MultiFromInputStream Signed-off-by: Daniel Kec --- .../jersey/InputStreamPublisher.java | 110 ---------------- .../jersey/InputStreamPublisherTest.java | 26 ++-- .../webserver/ClassPathContentHandler.java | 6 +- .../webserver/InputStreamPublisher.java | 118 ------------------ 4 files changed, 14 insertions(+), 246 deletions(-) delete mode 100644 security/integration/jersey/src/main/java/io/helidon/security/integration/jersey/InputStreamPublisher.java delete mode 100644 webserver/webserver/src/main/java/io/helidon/webserver/InputStreamPublisher.java diff --git a/security/integration/jersey/src/main/java/io/helidon/security/integration/jersey/InputStreamPublisher.java b/security/integration/jersey/src/main/java/io/helidon/security/integration/jersey/InputStreamPublisher.java deleted file mode 100644 index 3d83b52af83..00000000000 --- a/security/integration/jersey/src/main/java/io/helidon/security/integration/jersey/InputStreamPublisher.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright (c) 2018, 2019 Oracle and/or its affiliates. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.helidon.security.integration.jersey; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Flow; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Publisher that reads data from an input stream and publishes them as {@link ByteBuffer} events. - */ -@SuppressWarnings("Duplicates") -class InputStreamPublisher implements Flow.Publisher { - private final InputStream inputStream; - private final byte[] buffer; - - private final SingleSubscriberHolder subscriber = new SingleSubscriberHolder<>(); - - private final RequestedCounter requested = new RequestedCounter(); - private final AtomicBoolean publishing = new AtomicBoolean(false); - - /** - * Create new input stream publisher that reads data from a supplied input stream and publishes them a single subscriber. - *

- * Note that this implementation does not rely on any asynchronous processing and its business logic is always invoked - * on the subscriber thread (as part of {@link #subscribe(Flow.Subscriber)} and {@link Flow.Subscription#request(long)} - * method calls). - * - * @param inputStream underlying input stream to be used to read the data tu be published as events. - * @param bufferSize maximum published event data buffer size. - */ - InputStreamPublisher(InputStream inputStream, int bufferSize) { - this.inputStream = inputStream; - this.buffer = new byte[bufferSize]; - } - - @Override - public void subscribe(Flow.Subscriber subscriberParam) { - if (subscriber.register(subscriberParam)) { - publishing.set(true); // prevent onNext from inside of onSubscribe - - try { - subscriberParam.onSubscribe(new Flow.Subscription() { - @Override - public void request(long n) { - requested.increment(n, t -> tryComplete(t)); - tryPublish(); - } - - @Override - public void cancel() { - } - }); - } finally { - publishing.set(false); - } - - tryPublish(); // give onNext a chance in case request has been invoked in onSubscribe - } - } - - private void tryPublish() { - while (!subscriber.isClosed() && (requested.get() > 0) && publishing.compareAndSet(false, true)) { - try { - final Flow.Subscriber sub = this.subscriber.get(); // blocking retrieval - - while (!subscriber.isClosed() && requested.tryDecrement()) { - int len = inputStream.read(buffer); - if (len >= 0) { - sub.onNext(ByteBuffer.wrap(buffer, 0, len)); - } else { - tryComplete(); - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - tryComplete(e); - } catch (IOException | ExecutionException e) { - tryComplete(e); - } finally { - publishing.set(false); // give a chance to some other thread to publish - } - } - } - - private void tryComplete() { - subscriber.close(Flow.Subscriber::onComplete); - } - - private void tryComplete(Throwable t) { - subscriber.close(sub -> sub.onError(t)); - } -} diff --git a/security/integration/jersey/src/test/java/io/helidon/security/integration/jersey/InputStreamPublisherTest.java b/security/integration/jersey/src/test/java/io/helidon/security/integration/jersey/InputStreamPublisherTest.java index 982fd62e5b0..9b11bfcf1a9 100644 --- a/security/integration/jersey/src/test/java/io/helidon/security/integration/jersey/InputStreamPublisherTest.java +++ b/security/integration/jersey/src/test/java/io/helidon/security/integration/jersey/InputStreamPublisherTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018, 2019 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2018, 2020 Oracle and/or its affiliates. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,22 +25,22 @@ import java.util.concurrent.Flow; import java.util.concurrent.TimeUnit; -import org.junit.jupiter.api.Test; +import io.helidon.common.reactive.Multi; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import org.junit.jupiter.api.Test; + /** - * Unit test for {@link InputStreamPublisher}. + * Unit test for {@code InputStreamPublisher}'s replacement. */ public class InputStreamPublisherTest { @Test public void testSingle() throws InterruptedException { String teststring = "My text to publish with publisher"; - InputStreamPublisher p = - new InputStreamPublisher( - new ByteArrayInputStream(teststring.getBytes(StandardCharsets.UTF_8)), - 1024); + Multi p = Multi.from(new ByteArrayInputStream(teststring.getBytes(StandardCharsets.UTF_8))) + .withByteBufferSize(1024); ByteArrayOutputStream baos = new ByteArrayOutputStream(); CountDownLatch cdl = new CountDownLatch(1); @@ -83,10 +83,8 @@ public void onComplete() { @Test public void testMultiple() throws InterruptedException { String teststring = "My text to publish with publisher"; - InputStreamPublisher p = - new InputStreamPublisher( - new ByteArrayInputStream(teststring.getBytes(StandardCharsets.UTF_8)), - 1); + Multi p = Multi.from(new ByteArrayInputStream(teststring.getBytes(StandardCharsets.UTF_8))) + .withByteBufferSize(1); ByteArrayOutputStream baos = new ByteArrayOutputStream(); CountDownLatch cdl = new CountDownLatch(1); @@ -136,10 +134,8 @@ public void testVeryLong() throws IOException, InterruptedException { } teststring = expectedResult.toString(); - InputStreamPublisher p = - new InputStreamPublisher( - new ByteArrayInputStream(teststring.getBytes(StandardCharsets.UTF_8)), - 2); + Multi p = Multi.from(new ByteArrayInputStream(teststring.getBytes(StandardCharsets.UTF_8))) + .withByteBufferSize(2); ByteArrayOutputStream baos = new ByteArrayOutputStream(); CountDownLatch cdl = new CountDownLatch(1); diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/ClassPathContentHandler.java b/webserver/webserver/src/main/java/io/helidon/webserver/ClassPathContentHandler.java index af923536c9d..8c07c7b4207 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/ClassPathContentHandler.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/ClassPathContentHandler.java @@ -195,9 +195,9 @@ private void sendUrlStream(Http.RequestMethod method, URL url, ServerRequest req } InputStream in = url.openStream(); - InputStreamPublisher byteBufPublisher = new InputStreamPublisher(in, 2048); - response.send(Multi.from(byteBufPublisher) - .map(DataChunk::create)); + response.send(Multi.from(in) + .withByteBufferSize(2048) + .map(DataChunk::create)); } static String fileName(URL url) { diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/InputStreamPublisher.java b/webserver/webserver/src/main/java/io/helidon/webserver/InputStreamPublisher.java deleted file mode 100644 index 40c7023f552..00000000000 --- a/webserver/webserver/src/main/java/io/helidon/webserver/InputStreamPublisher.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.helidon.webserver; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Flow; -import java.util.concurrent.atomic.AtomicBoolean; - -import io.helidon.common.reactive.RequestedCounter; -import io.helidon.common.reactive.SingleSubscriberHolder; - -/** - * Publisher that reads data from an input stream and publishes them as {@link java.nio.ByteBuffer} events. - * - * @deprecated Until new reactive library is merged, we need to copy this class here - */ -@SuppressWarnings("Duplicates") -@Deprecated -class InputStreamPublisher implements Flow.Publisher { - private final InputStream inputStream; - private final byte[] buffer; - - private final SingleSubscriberHolder subscriber = new SingleSubscriberHolder<>(); - - private final RequestedCounter requested = new RequestedCounter(); - private final AtomicBoolean publishing = new AtomicBoolean(false); - - /** - * Create new input stream publisher that reads data from a supplied input stream and publishes them a single subscriber. - *

- * Note that this implementation does not rely on any asynchronous processing and its business logic is always invoked - * on the subscriber thread (as part of {@link #subscribe(java.util.concurrent.Flow.Subscriber)} and - * {@link java.util.concurrent.Flow.Subscription#request(long)} - * method calls). - * - * @param inputStream underlying input stream to be used to read the data tu be published as events. - * @param bufferSize maximum published event data buffer size. - */ - InputStreamPublisher(InputStream inputStream, int bufferSize) { - this.inputStream = inputStream; - this.buffer = new byte[bufferSize]; - } - - @Override - public void subscribe(Flow.Subscriber subscriberParam) { - if (subscriber.register(subscriberParam)) { - publishing.set(true); // prevent onNext from inside of onSubscribe - - try { - subscriberParam.onSubscribe(new Flow.Subscription() { - @Override - public void request(long n) { - requested.increment(n, t -> tryComplete(t)); - tryPublish(); - } - - @Override - public void cancel() { - } - }); - } finally { - publishing.set(false); - } - - tryPublish(); // give onNext a chance in case request has been invoked in onSubscribe - } - } - - private void tryPublish() { - while (!subscriber.isClosed() && (requested.get() > 0) && publishing.compareAndSet(false, true)) { - try { - final Flow.Subscriber sub = this.subscriber.get(); // blocking retrieval - - while (!subscriber.isClosed() && requested.tryDecrement()) { - int len = inputStream.read(buffer); - if (len >= 0) { - sub.onNext(ByteBuffer.wrap(buffer, 0, len)); - } else { - inputStream.close(); - tryComplete(); - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - tryComplete(e); - } catch (IOException | ExecutionException e) { - tryComplete(e); - } finally { - publishing.set(false); // give a chance to some other thread to publish - } - } - } - - private void tryComplete() { - subscriber.close(Flow.Subscriber::onComplete); - } - - private void tryComplete(Throwable t) { - subscriber.close(sub -> sub.onError(t)); - } -} From 3664001010f75837fd8d224425326766d7b03b25 Mon Sep 17 00:00:00 2001 From: Daniel Kec Date: Thu, 28 May 2020 16:33:04 +0200 Subject: [PATCH 3/3] Move MultiFromInputStream to IoMulti builder Signed-off-by: Daniel Kec --- .../io/helidon/common/reactive/IoMulti.java | 100 ++++++++++++++++++ .../io/helidon/common/reactive/Multi.java | 35 ------ .../common/reactive/MultiByteBuffer.java | 34 ------ .../common/reactive/MultiFromInputStream.java | 2 +- .../reactive/MultiFromInputStreamTest.java | 13 ++- ...MultiFromNotTrustedInputStreamTckTest.java | 5 +- .../MultiFromTrustedInputStreamTckTest.java | 4 +- .../jersey/InputStreamPublisherTest.java | 17 +-- .../webserver/ClassPathContentHandler.java | 7 +- 9 files changed, 131 insertions(+), 86 deletions(-) create mode 100644 common/reactive/src/main/java/io/helidon/common/reactive/IoMulti.java delete mode 100644 common/reactive/src/main/java/io/helidon/common/reactive/MultiByteBuffer.java diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/IoMulti.java b/common/reactive/src/main/java/io/helidon/common/reactive/IoMulti.java new file mode 100644 index 00000000000..5fc5d930437 --- /dev/null +++ b/common/reactive/src/main/java/io/helidon/common/reactive/IoMulti.java @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.helidon.common.reactive; + +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Objects; +import java.util.concurrent.ExecutorService; + +import io.helidon.common.Builder; + +/** + * Create reactive stream from standard IO resources. + */ +public interface IoMulti { + + /** + * Create a {@link Multi} instance that publishes {@link ByteBuffer}s from the given {@link InputStream}. + *

+ * {@link InputStream} is trusted not to block on read operations, in case it can't be assured use + * builder to specify executor for asynchronous waiting for blocking reads. + * {@code IoMulti.builder(is).executor(executorService).build()}. + * + * @param inputStream the Stream to publish + * @return Multi + * @throws NullPointerException if {@code stream} is {@code null} + */ + static Multi create(final InputStream inputStream) { + return IoMulti.builder(inputStream) + .build(); + } + + /** + * Creates a builder of the {@link Multi} from supplied {@link java.io.InputStream}. + * + * @param inputStream the Stream to publish + * @return the builder + */ + static MultiFromInputStreamBuilder builder(final InputStream inputStream) { + Objects.requireNonNull(inputStream); + return new MultiFromInputStreamBuilder(inputStream); + } + + final class MultiFromInputStreamBuilder implements Builder> { + + private int bufferSize = 1024; + private ExecutorService executor; + private final InputStream inputStream; + + MultiFromInputStreamBuilder(final InputStream inputStream) { + this.inputStream = inputStream; + } + + /** + * Set the size of {@link ByteBuffer}s being published. + * + * @param bufferSize size of the {@link ByteBuffer} + * @return Multi + */ + public MultiFromInputStreamBuilder byteBufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + /** + * If the {@code InputStream} can block in read method, use executor for asynchronous waiting. + * + * @param executor used for asynchronous waiting for blocking reads + * @return this builder + */ + public MultiFromInputStreamBuilder executor(final ExecutorService executor) { + Objects.requireNonNull(executor); + this.executor = executor; + return this; + } + + @Override + public Multi build() { + if (executor != null) { + return new MultiFromBlockingInputStream(inputStream, bufferSize, executor); + } + return new MultiFromInputStream(inputStream, bufferSize); + } + } +} diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/Multi.java b/common/reactive/src/main/java/io/helidon/common/reactive/Multi.java index 3bc52499070..f81f4d5e3c3 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/Multi.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/Multi.java @@ -15,8 +15,6 @@ */ package io.helidon.common.reactive; -import java.io.InputStream; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -24,7 +22,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Flow; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; @@ -233,38 +230,6 @@ static Multi from(Stream stream) { return new MultiFromStream<>(stream); } - /** - * Create a {@link Multi} instance that publishes {@link ByteBuffer}s from the given {@link InputStream}. - *

- * If {@link InputStream} can be trusted not to block on read operations, use faster - * {@link Multi#from(java.io.InputStream)}. - * - * @param inputStream the Stream to publish - * @param executor executor to use for waiting at {@link InputStream}'s blocking reads - * @return Multi - * @throws NullPointerException if {@code stream} is {@code null} - */ - static MultiByteBuffer from(InputStream inputStream, ExecutorService executor) { - Objects.requireNonNull(inputStream, "stream is null"); - Objects.requireNonNull(executor, "executor is null"); - return new MultiFromBlockingInputStream(inputStream, 4, executor); - } - - /** - * Create a {@link Multi} instance that publishes {@link ByteBuffer}s from the given {@link InputStream}. - *

- * {@link InputStream} is trusted not to block on read operations, in case it can't be assured use - * {@link Multi#from(java.io.InputStream, java.util.concurrent.ExecutorService)}. - * - * @param inputStream the Stream to publish - * @return Multi - * @throws NullPointerException if {@code stream} is {@code null} - */ - static MultiByteBuffer from(InputStream inputStream) { - Objects.requireNonNull(inputStream, "stream is null"); - return new MultiFromInputStream(inputStream, 4); - } - /** * Signal 0L, 1L and so on periodically to the downstream. *

diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/MultiByteBuffer.java b/common/reactive/src/main/java/io/helidon/common/reactive/MultiByteBuffer.java deleted file mode 100644 index 5284c0539fc..00000000000 --- a/common/reactive/src/main/java/io/helidon/common/reactive/MultiByteBuffer.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2020 Oracle and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package io.helidon.common.reactive; - -import java.nio.ByteBuffer; - -/** - * Multi publishing stream of {@link ByteBuffer}s. - */ -public interface MultiByteBuffer extends Multi { - - /** - * Set the size of {@link ByteBuffer}s being published. - * - * @param bufferSize size of the {@link ByteBuffer} - * @return Multi - */ - Multi withByteBufferSize(int bufferSize); -} diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/MultiFromInputStream.java b/common/reactive/src/main/java/io/helidon/common/reactive/MultiFromInputStream.java index b1222c862d0..04910b85878 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/MultiFromInputStream.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/MultiFromInputStream.java @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.IntSupplier; -class MultiFromInputStream implements MultiByteBuffer { +class MultiFromInputStream implements Multi { private final InputStream inputStream; private IntSupplier bufferSizeSupplier; diff --git a/common/reactive/src/test/java/io/helidon/common/reactive/MultiFromInputStreamTest.java b/common/reactive/src/test/java/io/helidon/common/reactive/MultiFromInputStreamTest.java index 2a2532c9def..f3c0833afee 100644 --- a/common/reactive/src/test/java/io/helidon/common/reactive/MultiFromInputStreamTest.java +++ b/common/reactive/src/test/java/io/helidon/common/reactive/MultiFromInputStreamTest.java @@ -59,7 +59,7 @@ public void testInputStream() { InputStream is = new ByteArrayInputStream(initialArray); - List result = Multi.from(is) + List result = IoMulti.create(is) .flatMapIterable((ByteBuffer b) -> { List list = new LinkedList<>(); while (b.remaining() > 0) { @@ -75,15 +75,18 @@ public void testInputStream() { @RepeatedTest(value = 20, name = "buffer size {currentRepetition}") void longStringTrustedStream(RepetitionInfo repetitionInfo) { var bufferSize = repetitionInfo.getCurrentRepetition(); - longString(is -> Multi.from(is) - .withByteBufferSize(bufferSize)); + longString(is -> IoMulti.builder(is) + .byteBufferSize(bufferSize) + .build()); } @RepeatedTest(value = 20, name = "buffer size {currentRepetition}") void longStringNotTrustedStream(RepetitionInfo repetitionInfo) { var bufferSize = repetitionInfo.getCurrentRepetition(); - longString(is -> Multi.from(is, executorService) - .withByteBufferSize(bufferSize)); + longString(is -> IoMulti.builder(is) + .executor(executorService) + .byteBufferSize(bufferSize) + .build()); } private void longString(Function> pubCreator) { diff --git a/common/reactive/src/test/java/io/helidon/common/reactive/MultiFromNotTrustedInputStreamTckTest.java b/common/reactive/src/test/java/io/helidon/common/reactive/MultiFromNotTrustedInputStreamTckTest.java index ebc0d43fda1..2bd3a04b9e1 100644 --- a/common/reactive/src/test/java/io/helidon/common/reactive/MultiFromNotTrustedInputStreamTckTest.java +++ b/common/reactive/src/test/java/io/helidon/common/reactive/MultiFromNotTrustedInputStreamTckTest.java @@ -27,7 +27,10 @@ public class MultiFromNotTrustedInputStreamTckTest extends MultiFromTrustedInput static final int BUFFER_SIZE = 4; protected Multi getPublisher(InputStream is) { - return Multi.from(is, Executors.newFixedThreadPool(4)).withByteBufferSize(BUFFER_SIZE); + return IoMulti.builder(is) + .executor(Executors.newFixedThreadPool(4)) + .byteBufferSize(BUFFER_SIZE) + .build(); } } diff --git a/common/reactive/src/test/java/io/helidon/common/reactive/MultiFromTrustedInputStreamTckTest.java b/common/reactive/src/test/java/io/helidon/common/reactive/MultiFromTrustedInputStreamTckTest.java index 6a297be3d22..dad4a452875 100644 --- a/common/reactive/src/test/java/io/helidon/common/reactive/MultiFromTrustedInputStreamTckTest.java +++ b/common/reactive/src/test/java/io/helidon/common/reactive/MultiFromTrustedInputStreamTckTest.java @@ -51,7 +51,9 @@ public int read() { } protected Multi getPublisher(InputStream is) { - return Multi.from(is).withByteBufferSize(BUFFER_SIZE); + return IoMulti.builder(is) + .byteBufferSize(BUFFER_SIZE) + .build(); } @Override diff --git a/security/integration/jersey/src/test/java/io/helidon/security/integration/jersey/InputStreamPublisherTest.java b/security/integration/jersey/src/test/java/io/helidon/security/integration/jersey/InputStreamPublisherTest.java index 9b11bfcf1a9..ef58771f887 100644 --- a/security/integration/jersey/src/test/java/io/helidon/security/integration/jersey/InputStreamPublisherTest.java +++ b/security/integration/jersey/src/test/java/io/helidon/security/integration/jersey/InputStreamPublisherTest.java @@ -25,6 +25,7 @@ import java.util.concurrent.Flow; import java.util.concurrent.TimeUnit; +import io.helidon.common.reactive.IoMulti; import io.helidon.common.reactive.Multi; import static org.hamcrest.CoreMatchers.is; @@ -39,8 +40,10 @@ public class InputStreamPublisherTest { @Test public void testSingle() throws InterruptedException { String teststring = "My text to publish with publisher"; - Multi p = Multi.from(new ByteArrayInputStream(teststring.getBytes(StandardCharsets.UTF_8))) - .withByteBufferSize(1024); + + Multi p = IoMulti.builder(new ByteArrayInputStream(teststring.getBytes(StandardCharsets.UTF_8))) + .byteBufferSize(1024) + .build(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); CountDownLatch cdl = new CountDownLatch(1); @@ -83,8 +86,9 @@ public void onComplete() { @Test public void testMultiple() throws InterruptedException { String teststring = "My text to publish with publisher"; - Multi p = Multi.from(new ByteArrayInputStream(teststring.getBytes(StandardCharsets.UTF_8))) - .withByteBufferSize(1); + Multi p = IoMulti.builder(new ByteArrayInputStream(teststring.getBytes(StandardCharsets.UTF_8))) + .byteBufferSize(1) + .build(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); CountDownLatch cdl = new CountDownLatch(1); @@ -134,8 +138,9 @@ public void testVeryLong() throws IOException, InterruptedException { } teststring = expectedResult.toString(); - Multi p = Multi.from(new ByteArrayInputStream(teststring.getBytes(StandardCharsets.UTF_8))) - .withByteBufferSize(2); + Multi p = IoMulti.builder(new ByteArrayInputStream(teststring.getBytes(StandardCharsets.UTF_8))) + .byteBufferSize(2) + .build(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); CountDownLatch cdl = new CountDownLatch(1); diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/ClassPathContentHandler.java b/webserver/webserver/src/main/java/io/helidon/webserver/ClassPathContentHandler.java index 8c07c7b4207..ec2f8d0bc38 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/ClassPathContentHandler.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/ClassPathContentHandler.java @@ -36,7 +36,7 @@ import io.helidon.common.http.DataChunk; import io.helidon.common.http.Http; -import io.helidon.common.reactive.Multi; +import io.helidon.common.reactive.IoMulti; /** * Handles static content from the classpath. @@ -195,8 +195,9 @@ private void sendUrlStream(Http.RequestMethod method, URL url, ServerRequest req } InputStream in = url.openStream(); - response.send(Multi.from(in) - .withByteBufferSize(2048) + response.send(IoMulti.builder(in) + .byteBufferSize(2048) + .build() .map(DataChunk::create)); }