From 026d1bdb4a6ad02490733799b4ce4fdeec6f4f7e Mon Sep 17 00:00:00 2001 From: Michael Dowling Date: Fri, 10 Jan 2025 16:13:54 -0600 Subject: [PATCH] Properly adhere to Flow spec when converting flows To map the JavaHttpClient's Flow.Publisher> to our Flow.Publisher we have to buffer byte buffers until downstream subscribers request them. We were previously just calling onNext with each individual ByteBuffer for as many buffers as we were handed by the client's publisher. In addition to now queueing these byte buffers when necessary, this commit also adds a dedicated DataStream for the JavaHttpClientTransport to avoid a bunch of intermediate conversions we were doing previously to convert the publisher to a ByteBuffer or to an InputStream. I added some basic test cases and also tested manually calling some services, which is how I discovered we needed to update system properties for the client statically too. --- .../client/http/HttpClientDataStream.java | 129 ++++++++++++++++++ .../client/http/JavaHttpClientTransport.java | 65 ++++----- .../client/http/HttpClientDataStreamTest.java | 114 ++++++++++++++++ 3 files changed, 268 insertions(+), 40 deletions(-) create mode 100644 client-http/src/main/java/software/amazon/smithy/java/client/http/HttpClientDataStream.java create mode 100644 client-http/src/test/java/software/amazon/smithy/java/client/http/HttpClientDataStreamTest.java diff --git a/client-http/src/main/java/software/amazon/smithy/java/client/http/HttpClientDataStream.java b/client-http/src/main/java/software/amazon/smithy/java/client/http/HttpClientDataStream.java new file mode 100644 index 000000000..6f95fa17e --- /dev/null +++ b/client-http/src/main/java/software/amazon/smithy/java/client/http/HttpClientDataStream.java @@ -0,0 +1,129 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.java.client.http; + +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import software.amazon.smithy.java.io.datastream.DataStream; + +/** + * This class defers turning the HTTP client's response publisher into an adapted publisher unless required. + * + *

This class avoids needing to use multiple intermediate adapters to go from a flow that publishes a list of + * byte buffers to a flow that publishes single byte buffer. Instead, it directly implements asByteBuffer and + * asInputStream to use a more direct integration from the HTTP client library. + */ +record HttpClientDataStream( + Flow.Publisher> httpPublisher, + long contentLength, + String contentType) implements DataStream { + @Override + public boolean isReplayable() { + return false; + } + + @Override + public CompletableFuture asByteBuffer() { + var p = java.net.http.HttpResponse.BodySubscribers.ofByteArray(); + httpPublisher.subscribe(p); + return p.getBody().thenApply(ByteBuffer::wrap).toCompletableFuture(); + } + + @Override + public CompletableFuture asInputStream() { + var p = java.net.http.HttpResponse.BodySubscribers.ofInputStream(); + httpPublisher.subscribe(p); + return p.getBody().toCompletableFuture(); + } + + @Override + public void subscribe(Flow.Subscriber subscriber) { + // Adapt the "Flow.Subscriber" to "Flow.Subscriber". + httpPublisher.subscribe(new BbListToBbSubscriber(subscriber)); + } + + private static final class BbListToBbSubscriber implements Flow.Subscriber> { + private final Flow.Subscriber subscriber; + + BbListToBbSubscriber(Flow.Subscriber subscriber) { + this.subscriber = subscriber; + } + + private Flow.Subscription upstreamSubscription; + private final Queue queue = new ConcurrentLinkedQueue<>(); + private final AtomicLong demand = new AtomicLong(0); + private final AtomicBoolean senderFinished = new AtomicBoolean(false); + + @Override + public void onSubscribe(Flow.Subscription subscription) { + upstreamSubscription = subscription; + + subscriber.onSubscribe(new Flow.Subscription() { + @Override + public void request(long n) { + demand.addAndGet(n); + drainAndRequest(); + } + + @Override + public void cancel() { + upstreamSubscription.cancel(); + } + }); + } + + @Override + public void onError(Throwable throwable) { + subscriber.onError(throwable); + } + + @Override + public void onNext(List item) { + queue.addAll(item); + drainAndRequest(); + } + + @Override + public void onComplete() { + // The sender is done sending us bytes, so when our queue is empty, emit onComplete downstream. + senderFinished.set(true); + drain(); + } + + private void drain() { + try { + while (!queue.isEmpty() && demand.get() > 0) { + ByteBuffer buffer = queue.poll(); + if (buffer != null) { + subscriber.onNext(buffer); + demand.decrementAndGet(); + } + } + // When we have no more buffered BBs and the sender has signaled they're done, then complete downstream. + if (queue.isEmpty() && senderFinished.get()) { + subscriber.onComplete(); + } + } catch (Exception e) { + subscriber.onError(e); + } + } + + private void drainAndRequest() { + drain(); + + if (queue.isEmpty() && !senderFinished.get()) { + upstreamSubscription.request(demand.get()); + } + } + } +} diff --git a/client-http/src/main/java/software/amazon/smithy/java/client/http/JavaHttpClientTransport.java b/client-http/src/main/java/software/amazon/smithy/java/client/http/JavaHttpClientTransport.java index 87ce313f6..ca881bb4e 100644 --- a/client-http/src/main/java/software/amazon/smithy/java/client/http/JavaHttpClientTransport.java +++ b/client-http/src/main/java/software/amazon/smithy/java/client/http/JavaHttpClientTransport.java @@ -32,16 +32,12 @@ public class JavaHttpClientTransport implements ClientTransport sendRequest(java.net.http.HttpRequest re } private HttpResponse createSmithyResponse(java.net.http.HttpResponse>> response) { - LOGGER.trace("Got response: {}; headers: {}", response, response.headers().map()); + var headerMap = response.headers().map(); + LOGGER.trace("Got response: {}; headers: {}", response, headerMap); + + var headers = HttpHeaders.of(headerMap); + var length = headers.contentLength(); + long adaptedLength = length == null ? -1 : length; + return HttpResponse.builder() .httpVersion(javaToSmithyVersion(response.version())) .statusCode(response.statusCode()) - .headers(HttpHeaders.of(response.headers().map())) - .body(new ListByteBufferToByteBuffer(response.body())) // Flatten the List to ByteBuffer. + .headers(headers) + .body(new HttpClientDataStream(response.body(), adaptedLength, headers.contentType())) .build(); } @@ -148,35 +162,6 @@ private static HttpVersion javaToSmithyVersion(HttpClient.Version version) { }; } - private record ListByteBufferToByteBuffer(Flow.Publisher> originalPublisher) - implements Flow.Publisher { - @Override - public void subscribe(Flow.Subscriber subscriber) { - originalPublisher.subscribe(new Flow.Subscriber<>() { - @Override - public void onSubscribe(Flow.Subscription subscription) { - subscriber.onSubscribe(subscription); - } - - @Override - public void onNext(List item) { - // TODO: subscriber.onNext should only be called as many times as requested by the subscription - item.forEach(subscriber::onNext); - } - - @Override - public void onError(Throwable throwable) { - subscriber.onError(throwable); - } - - @Override - public void onComplete() { - subscriber.onComplete(); - } - }); - } - } - public static final class Factory implements ClientTransportFactory { @Override public String name() { diff --git a/client-http/src/test/java/software/amazon/smithy/java/client/http/HttpClientDataStreamTest.java b/client-http/src/test/java/software/amazon/smithy/java/client/http/HttpClientDataStreamTest.java new file mode 100644 index 000000000..7d6358ea5 --- /dev/null +++ b/client-http/src/test/java/software/amazon/smithy/java/client/http/HttpClientDataStreamTest.java @@ -0,0 +1,114 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.java.client.http; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Flow; +import java.util.concurrent.SubmissionPublisher; +import org.junit.jupiter.api.Test; + +public class HttpClientDataStreamTest { + + private static List> createCannedBuffers() { + return List.of( + List.of(ByteBuffer.wrap("{\"hi\":".getBytes(StandardCharsets.UTF_8))), + List.of( + ByteBuffer.wrap("[1, ".getBytes(StandardCharsets.UTF_8)), + ByteBuffer.wrap("2]".getBytes(StandardCharsets.UTF_8)) + ), + List.of(ByteBuffer.wrap("}".getBytes(StandardCharsets.UTF_8))) + ); + } + + private static final class CannedPublisher extends SubmissionPublisher> { + void pushData(List> data) { + data.forEach(this::submit); + close(); + } + } + + @Test + public void convertsToBb() throws Exception { + var httpPublisher = new CannedPublisher(); + var ds = new HttpClientDataStream(httpPublisher, 13, "application/json"); + assertThat(ds.contentType(), equalTo("application/json")); + assertThat(ds.contentLength(), equalTo(13L)); + + var cf = ds.asByteBuffer(); + httpPublisher.pushData(createCannedBuffers()); + + var bb = cf.get(); + assertThat(bb.remaining(), equalTo(13)); + assertThat(new String(bb.array(), StandardCharsets.UTF_8), equalTo("{\"hi\":[1, 2]}")); + } + + @Test + public void convertsToInputStream() throws Exception { + var httpPublisher = new CannedPublisher(); + var ds = new HttpClientDataStream(httpPublisher, 13, "application/json"); + var cf = ds.asInputStream(); + httpPublisher.pushData(createCannedBuffers()); + + var is = cf.get(); + assertThat(new String(is.readAllBytes(), StandardCharsets.UTF_8), equalTo("{\"hi\":[1, 2]}")); + } + + @Test + public void convertsToPublisher() throws Exception { + var httpPublisher = new CannedPublisher(); + var ds = new HttpClientDataStream(httpPublisher, 13, "application/json"); + + var collector = new CollectingSubscriber(); + ds.subscribe(collector); + httpPublisher.pushData(createCannedBuffers()); + var results = collector.getResult().get(); + + assertThat(results, equalTo("{\"hi\":[1, 2]}")); + } + + public static final class CollectingSubscriber implements Flow.Subscriber { + private final List buffers = Collections.synchronizedList(new ArrayList<>()); + private final CompletableFuture result = new CompletableFuture<>(); + private Flow.Subscription subscription; + + @Override + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + subscription.request(Long.MAX_VALUE); + } + + @Override + public void onNext(ByteBuffer item) { + buffers.add(new String(item.array(), StandardCharsets.UTF_8)); + } + + @Override + public void onError(Throwable throwable) { + result.completeExceptionally(throwable); + } + + @Override + public void onComplete() { + StringBuilder builder = new StringBuilder(); + for (String buffer : buffers) { + builder.append(buffer); + } + result.complete(builder.toString()); + } + + public CompletableFuture getResult() { + return result; + } + } +}