Skip to content

Commit

Permalink
Properly adhere to Flow spec when converting flows
Browse files Browse the repository at this point in the history
To map the JavaHttpClient's Flow.Publisher<List<ByteBuffer>> to our
Flow.Publisher<ByteBuffer> we have to buffer byte buffers until
downstream subscribers request them. We were previously just calling
onNext with each individual ByteBuffer for as many buffers as we were
handed by the client's publisher.

In addition to now queueing these byte buffers when necessary, this
commit also adds a dedicated DataStream for the JavaHttpClientTransport
to avoid a bunch of intermediate conversions we were doing previously to
convert the publisher to a ByteBuffer or to an InputStream.

I added some basic test cases and also tested manually calling some
services, which is how I discovered we needed to update system
properties for the client statically too.
  • Loading branch information
mtdowling committed Jan 10, 2025
1 parent 7ca2f27 commit 026d1bd
Show file tree
Hide file tree
Showing 3 changed files with 268 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package software.amazon.smithy.java.client.http;

import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import software.amazon.smithy.java.io.datastream.DataStream;

/**
* This class defers turning the HTTP client's response publisher into an adapted publisher unless required.
*
* <p>This class avoids needing to use multiple intermediate adapters to go from a flow that publishes a list of
* byte buffers to a flow that publishes single byte buffer. Instead, it directly implements asByteBuffer and
* asInputStream to use a more direct integration from the HTTP client library.
*/
record HttpClientDataStream(
Flow.Publisher<List<ByteBuffer>> httpPublisher,
long contentLength,
String contentType) implements DataStream {
@Override
public boolean isReplayable() {
return false;
}

@Override
public CompletableFuture<ByteBuffer> asByteBuffer() {
var p = java.net.http.HttpResponse.BodySubscribers.ofByteArray();
httpPublisher.subscribe(p);
return p.getBody().thenApply(ByteBuffer::wrap).toCompletableFuture();
}

@Override
public CompletableFuture<InputStream> asInputStream() {
var p = java.net.http.HttpResponse.BodySubscribers.ofInputStream();
httpPublisher.subscribe(p);
return p.getBody().toCompletableFuture();
}

@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
// Adapt the "Flow.Subscriber<List<ByteBuffer>" to "Flow.Subscriber<ByteBuffer>".
httpPublisher.subscribe(new BbListToBbSubscriber(subscriber));
}

private static final class BbListToBbSubscriber implements Flow.Subscriber<List<ByteBuffer>> {
private final Flow.Subscriber<? super ByteBuffer> subscriber;

BbListToBbSubscriber(Flow.Subscriber<? super ByteBuffer> subscriber) {
this.subscriber = subscriber;
}

private Flow.Subscription upstreamSubscription;
private final Queue<ByteBuffer> queue = new ConcurrentLinkedQueue<>();
private final AtomicLong demand = new AtomicLong(0);
private final AtomicBoolean senderFinished = new AtomicBoolean(false);

@Override
public void onSubscribe(Flow.Subscription subscription) {
upstreamSubscription = subscription;

subscriber.onSubscribe(new Flow.Subscription() {
@Override
public void request(long n) {
demand.addAndGet(n);
drainAndRequest();
}

@Override
public void cancel() {
upstreamSubscription.cancel();
}
});
}

@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}

@Override
public void onNext(List<ByteBuffer> item) {
queue.addAll(item);
drainAndRequest();
}

@Override
public void onComplete() {
// The sender is done sending us bytes, so when our queue is empty, emit onComplete downstream.
senderFinished.set(true);
drain();
}

private void drain() {
try {
while (!queue.isEmpty() && demand.get() > 0) {
ByteBuffer buffer = queue.poll();
if (buffer != null) {
subscriber.onNext(buffer);
demand.decrementAndGet();
}
}
// When we have no more buffered BBs and the sender has signaled they're done, then complete downstream.
if (queue.isEmpty() && senderFinished.get()) {
subscriber.onComplete();
}
} catch (Exception e) {
subscriber.onError(e);
}
}

private void drainAndRequest() {
drain();

if (queue.isEmpty() && !senderFinished.get()) {
upstreamSubscription.request(demand.get());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,12 @@ public class JavaHttpClientTransport implements ClientTransport<HttpRequest, Htt
private static final InternalLogger LOGGER = InternalLogger.getLogger(JavaHttpClientTransport.class);
private final HttpClient client;

public JavaHttpClientTransport() {
this(HttpClient.newHttpClient());
static {
// For some reason, this can't just be done in the constructor to always take effect.
setHostProperties();
}

/**
* @param client Java client to use.
*/
public JavaHttpClientTransport(HttpClient client) {
this.client = client;

private static void setHostProperties() {
// Allow clients to set Host header. This has to be done using a system property and can't be done per/client.
var currentValues = System.getProperty("jdk.httpclient.allowRestrictedHeaders");
if (currentValues == null || currentValues.isEmpty()) {
Expand All @@ -51,6 +47,18 @@ public JavaHttpClientTransport(HttpClient client) {
}
}

public JavaHttpClientTransport() {
this(HttpClient.newHttpClient());
}

/**
* @param client Java client to use.
*/
public JavaHttpClientTransport(HttpClient client) {
this.client = client;
setHostProperties();
}

private static boolean containsHost(String currentValues) {
int length = currentValues.length();
for (int i = 0; i < length; i++) {
Expand Down Expand Up @@ -123,12 +131,18 @@ private CompletableFuture<HttpResponse> sendRequest(java.net.http.HttpRequest re
}

private HttpResponse createSmithyResponse(java.net.http.HttpResponse<Flow.Publisher<List<ByteBuffer>>> response) {
LOGGER.trace("Got response: {}; headers: {}", response, response.headers().map());
var headerMap = response.headers().map();
LOGGER.trace("Got response: {}; headers: {}", response, headerMap);

var headers = HttpHeaders.of(headerMap);
var length = headers.contentLength();
long adaptedLength = length == null ? -1 : length;

return HttpResponse.builder()
.httpVersion(javaToSmithyVersion(response.version()))
.statusCode(response.statusCode())
.headers(HttpHeaders.of(response.headers().map()))
.body(new ListByteBufferToByteBuffer(response.body())) // Flatten the List<ByteBuffer> to ByteBuffer.
.headers(headers)
.body(new HttpClientDataStream(response.body(), adaptedLength, headers.contentType()))
.build();
}

Expand All @@ -148,35 +162,6 @@ private static HttpVersion javaToSmithyVersion(HttpClient.Version version) {
};
}

private record ListByteBufferToByteBuffer(Flow.Publisher<List<ByteBuffer>> originalPublisher)
implements Flow.Publisher<ByteBuffer> {
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
originalPublisher.subscribe(new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscriber.onSubscribe(subscription);
}

@Override
public void onNext(List<ByteBuffer> item) {
// TODO: subscriber.onNext should only be called as many times as requested by the subscription
item.forEach(subscriber::onNext);
}

@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}

@Override
public void onComplete() {
subscriber.onComplete();
}
});
}
}

public static final class Factory implements ClientTransportFactory<HttpRequest, HttpResponse> {
@Override
public String name() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package software.amazon.smithy.java.client.http;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import org.junit.jupiter.api.Test;

public class HttpClientDataStreamTest {

private static List<List<ByteBuffer>> createCannedBuffers() {
return List.of(
List.of(ByteBuffer.wrap("{\"hi\":".getBytes(StandardCharsets.UTF_8))),
List.of(
ByteBuffer.wrap("[1, ".getBytes(StandardCharsets.UTF_8)),
ByteBuffer.wrap("2]".getBytes(StandardCharsets.UTF_8))
),
List.of(ByteBuffer.wrap("}".getBytes(StandardCharsets.UTF_8)))
);
}

private static final class CannedPublisher extends SubmissionPublisher<List<ByteBuffer>> {
void pushData(List<List<ByteBuffer>> data) {
data.forEach(this::submit);
close();
}
}

@Test
public void convertsToBb() throws Exception {
var httpPublisher = new CannedPublisher();
var ds = new HttpClientDataStream(httpPublisher, 13, "application/json");
assertThat(ds.contentType(), equalTo("application/json"));
assertThat(ds.contentLength(), equalTo(13L));

var cf = ds.asByteBuffer();
httpPublisher.pushData(createCannedBuffers());

var bb = cf.get();
assertThat(bb.remaining(), equalTo(13));
assertThat(new String(bb.array(), StandardCharsets.UTF_8), equalTo("{\"hi\":[1, 2]}"));
}

@Test
public void convertsToInputStream() throws Exception {
var httpPublisher = new CannedPublisher();
var ds = new HttpClientDataStream(httpPublisher, 13, "application/json");
var cf = ds.asInputStream();
httpPublisher.pushData(createCannedBuffers());

var is = cf.get();
assertThat(new String(is.readAllBytes(), StandardCharsets.UTF_8), equalTo("{\"hi\":[1, 2]}"));
}

@Test
public void convertsToPublisher() throws Exception {
var httpPublisher = new CannedPublisher();
var ds = new HttpClientDataStream(httpPublisher, 13, "application/json");

var collector = new CollectingSubscriber();
ds.subscribe(collector);
httpPublisher.pushData(createCannedBuffers());
var results = collector.getResult().get();

assertThat(results, equalTo("{\"hi\":[1, 2]}"));
}

public static final class CollectingSubscriber implements Flow.Subscriber<ByteBuffer> {
private final List<String> buffers = Collections.synchronizedList(new ArrayList<>());
private final CompletableFuture<String> result = new CompletableFuture<>();
private Flow.Subscription subscription;

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
}

@Override
public void onNext(ByteBuffer item) {
buffers.add(new String(item.array(), StandardCharsets.UTF_8));
}

@Override
public void onError(Throwable throwable) {
result.completeExceptionally(throwable);
}

@Override
public void onComplete() {
StringBuilder builder = new StringBuilder();
for (String buffer : buffers) {
builder.append(buffer);
}
result.complete(builder.toString());
}

public CompletableFuture<String> getResult() {
return result;
}
}
}

0 comments on commit 026d1bd

Please sign in to comment.