diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/FromInputStreamPublisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/FromInputStreamPublisher.java index 7f772c6b1d..4ce0688d60 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/FromInputStreamPublisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/FromInputStreamPublisher.java @@ -1,5 +1,5 @@ /* - * Copyright © 2018 Apple Inc. and the ServiceTalk project authors + * Copyright © 2018-2021, 2023-2024 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -171,7 +171,7 @@ private void readAndDeliver(final Subscriber subscriber) { int available = stream.available(); if (available == 0) { // Work around InputStreams that don't strictly honor the 0 == EOF contract. - available = buffer != null ? buffer.length : 1; + available = buffer != null ? buffer.length : readChunkSize; } available = fillBufferAvoidingBlocking(available); emitSingleBuffer(subscriber); @@ -212,6 +212,8 @@ private void emitSingleBuffer(final Subscriber subscriber) { b = buffer; buffer = null; } else { + // this extra copy is necessary when we read the last chunk and total number of bytes read before EOF + // is less than guesstimated buffer size b = new byte[writeIdx]; arraycopy(buffer, 0, b, 0, writeIdx); } diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/FromInputStreamPublisherTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/FromInputStreamPublisherTest.java index 64e37a0157..219e7f3197 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/FromInputStreamPublisherTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/FromInputStreamPublisherTest.java @@ -1,5 +1,5 @@ /* - * Copyright © 2018-2019, 2021 Apple Inc. and the ServiceTalk project authors + * Copyright © 2018-2021, 2023-2024 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.io.InputStream; @@ -200,7 +202,7 @@ void streamClosedAndErrorOnReadIOError() throws Exception { void streamClosedAndErrorOnDeliveryError() throws Exception { initChunkedStream(smallBuff, of(10), of(10)); - Subscriber sub = mock(Subscriber.class); + Subscriber sub = mock(Subscriber.class); doAnswer(inv -> { Subscription s = inv.getArgument(0); @@ -221,7 +223,7 @@ void streamClosedAndErrorOnDeliveryError() throws Exception { void streamClosedAndErrorOnDeliveryErrorOnce() throws Exception { initChunkedStream(smallBuff, ofAll(10), ofAll(10)); - Subscriber sub = mock(Subscriber.class); + Subscriber sub = mock(Subscriber.class); AtomicReference subRef = new AtomicReference<>(); doAnswer(inv -> { @@ -246,7 +248,7 @@ void streamClosedAndErrorOnDeliveryErrorOnce() throws Exception { void streamCanceledShouldCloseOnce() throws Exception { initChunkedStream(smallBuff, ofAll(10), ofAll(10)); - Subscriber sub = mock(Subscriber.class); + Subscriber sub = mock(Subscriber.class); doAnswer(inv -> { Subscription s = inv.getArgument(0); @@ -351,21 +353,70 @@ void completeStreamIfEOFObservedDuringReadFromOverEstimatedAvailability() throws } @Test - void dontFailOnInputStreamWithBrokenAvailableCall() throws Throwable { - initChunkedStream(bigBuff, of(5, 0, 0, 10, 5, 5, 5, 5, 0), - of(5, 1, 1, 10, 5, 5, 5, 5, 0)); + void readsAllBytesWhenAvailableNotImplemented() throws Throwable { + // constrain publisher to 10 byte chunks with no data availability to enforce inner loops until buffer drained + initChunkedStream(bigBuff, ofAll(0), ofAll(10)); - byte[][] items = { - new byte[]{0, 1, 2, 3, 4}, - new byte[]{5}, // avail == 0 -> override to 1 - new byte[]{6}, // avail == 0 -> override to 1 - new byte[]{7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, - new byte[]{17, 18, 19, 20, 21}, - new byte[]{22, 23, 24, 25, 26}, - new byte[]{27, 28, 29, 30, 31}, - new byte[]{32, 33, 34, 35, 36}, - }; + // expect single emitted item + // [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, + // 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, + // 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, + // 30, 31, 32, 33, 34, 35, 36] + + byte[][] items = chunked(bigBuff.length, bigBuff.length); + verifySuccess(items); + } + + @ParameterizedTest(name = "{displayName} [{index}] readChunkSize={0}") + @ValueSource(ints = {7, 1024}) + void doNotFailOnInputStreamWithBrokenAvailableCall(int readChunkSize) throws Throwable { + initChunkedStream(bigBuff, of(5, 0, 0, 10, 5, 5, 1, 0), + of(5, 7, 7, 10, 5, 5, 1, 0)); + pub = new FromInputStreamPublisher(inputStream, readChunkSize); + + if (readChunkSize > bigBuff.length) { + byte[][] items = { + new byte[]{0, 1, 2, 3, 4}, + // avail == 0 -> override to readChunkSize + new byte[]{5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, + 28, 29, 30, 31, 32, 33, 34, 35, 36}, + }; + verifySuccess(items); + } else { + byte[][] items = { + new byte[]{0, 1, 2, 3, 4}, + // avail == 0 -> override to readChunkSize + new byte[]{5, 6, 7, 8, 9, 10, 11}, + // avail == 0 -> override to readChunkSize + new byte[]{12, 13, 14, 15, 16, 17, 18}, + // readChunkSize < available + new byte[]{19, 20, 21, 22, 23, 24, 25}, + new byte[]{26, 27, 28, 29, 30}, + new byte[]{31, 32, 33, 34, 35}, + new byte[]{36}, + }; + verifySuccess(items); + } + } + + @ParameterizedTest(name = "{displayName} [{index}] chunkSize={0}") + @ValueSource(ints = {3, 5, 7}) + void readChunkSizeRespectedWhenAvailableNotImplemented(int chunkSize) throws Throwable { + initChunkedStream(bigBuff, ofAll(0), ofAll(chunkSize)); + int readChunkSize = 5; + pub = new FromInputStreamPublisher(inputStream, readChunkSize); + // expect 8 emitted items + // [ 0, 1, 2, 3, 4] + // [ 5, 6, 7, 8, 9] + // [10, 11, 12, 13, 14] + // [15, 16, 17, 18, 19] + // [20, 21, 22, 23, 24] + // [25, 26, 27, 28, 29] + // [30, 31, 32, 33, 34] + // [35, 36] + + byte[][] items = chunked(bigBuff.length, readChunkSize); verifySuccess(items); } @@ -400,11 +451,12 @@ void keepReadingWhenAvailabilityPermits() throws Throwable { verifySuccess(items); } - @Test - void repeatedReadingWhenAvailabilityRunsOut() throws Throwable { - // constrain publisher to 10 byte chunks with only 5 byte availability per chunk to enforce multiple outer loops - // simulating multiple calls to IS.available() - initChunkedStream(bigBuff, ofAll(5), ofAll(5)); // 5 byte chunks per available() call + @ParameterizedTest(name = "{displayName} [{index}] chunkSize={0}") + @ValueSource(ints = {3, 5, 7}) + void repeatedReadingWhenAvailabilityRunsOut(int chunkSize) throws Throwable { + // constrain publisher to chunkSize byte chunks with only 5 byte availability per chunk to enforce multiple + // outer loops simulating multiple calls to IS.available() + initChunkedStream(bigBuff, ofAll(5), ofAll(chunkSize)); // chunkSize byte chunks per available() call // expect 8 emitted items // [ 0, 1, 2, 3, 4] diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFromInputStreamTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFromInputStreamTckTest.java new file mode 100644 index 0000000000..1e439253d8 --- /dev/null +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherFromInputStreamTckTest.java @@ -0,0 +1,36 @@ +/* + * Copyright © 2024 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.reactivestreams.tck; + +import io.servicetalk.concurrent.api.Publisher; + +import org.testng.annotations.Test; + +import java.io.ByteArrayInputStream; + +@Test +public class PublisherFromInputStreamTckTest extends AbstractPublisherTckTest { + + @Override + protected Publisher createServiceTalkPublisher(long elements) { + return Publisher.fromInputStream(new ByteArrayInputStream(new byte[(int) elements]), 1); + } + + @Override + public long maxElementsFromPublisher() { + return TckUtils.maxElementsFromPublisher(); + } +}