diff --git a/gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStream.java b/gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStream.java index 518c68523c..d1f9848631 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStream.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/rpc/ServerStream.java @@ -31,6 +31,8 @@ import com.google.api.core.InternalApi; import java.util.Iterator; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import javax.annotation.Nonnull; /** @@ -89,6 +91,15 @@ public Iterator iterator() { return iterator; } + /** + * Returns a sequential {@code Stream} with server responses as its source. + * + * @return a sequential {@code Stream} over the elements in server responses + */ + public Stream stream() { + return StreamSupport.stream(this.spliterator(), false); + } + /** * Returns true if the next call to the iterator's hasNext() or next() is guaranteed to be * nonblocking. diff --git a/gax-java/gax/src/test/java/com/google/api/gax/rpc/ServerStreamTest.java b/gax-java/gax/src/test/java/com/google/api/gax/rpc/ServerStreamTest.java index b0cdc30ce1..087c64ca48 100644 --- a/gax-java/gax/src/test/java/com/google/api/gax/rpc/ServerStreamTest.java +++ b/gax-java/gax/src/test/java/com/google/api/gax/rpc/ServerStreamTest.java @@ -41,6 +41,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -109,6 +110,31 @@ public List call() { Truth.assertThat(results).containsExactly(0, 1, 2, 3, 4); } + @Test + public void testMultipleItemStreamMethod() throws Exception { + Future producerFuture = + executor.submit( + () -> { + for (int i = 0; i < 5; i++) { + int requestCount = controller.popLastPull(); + + Truth.assertWithMessage("ServerStream should request one item at a time") + .that(requestCount) + .isEqualTo(1); + + stream.observer().onResponse(i); + } + stream.observer().onComplete(); + return null; + }); + Future> consumerFuture = + executor.submit(() -> stream.stream().collect(Collectors.toList())); + + producerFuture.get(60, TimeUnit.SECONDS); + List results = consumerFuture.get(); + Truth.assertThat(results).containsExactly(0, 1, 2, 3, 4); + } + @Test public void testEarlyTermination() throws Exception { Future taskFuture = diff --git a/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java b/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java index e0dc152648..e8d22c2756 100644 --- a/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java +++ b/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITServerSideStreaming.java @@ -30,6 +30,7 @@ import com.google.showcase.v1beta1.it.util.TestClientInitializer; import java.util.ArrayList; import java.util.Iterator; +import java.util.stream.Collectors; import org.junit.After; import org.junit.Before; import org.junit.Ignore; @@ -71,6 +72,18 @@ public void testGrpc_receiveStreamedContent() { .inOrder(); } + @Test + public void testGrpc_receiveStreamedContentStreamAPI() { + String content = "The rain in Spain stays mainly on the plain!"; + ServerStream responseStream = + grpcClient.expandCallable().call(ExpandRequest.newBuilder().setContent(content).build()); + assertThat(responseStream.stream().map(EchoResponse::getContent).collect(Collectors.toList())) + .containsExactlyElementsIn( + ImmutableList.of( + "The", "rain", "in", "Spain", "stays", "mainly", "on", "the", "plain!")) + .inOrder(); + } + @Test public void testGrpc_serverError_receiveErrorAfterLastWordInStream() { String content = "The rain in Spain";