Skip to content

Commit

Permalink
Support setting response timeout for individual requests in Netty HTT…
Browse files Browse the repository at this point in the history
…P client (#23244)

* Support setting response timeout for individual requests in Netty HTTP client

* Move timeout handlers to client
  • Loading branch information
srnagar authored Aug 1, 2021
1 parent 71ab070 commit 08a3b7f
Show file tree
Hide file tree
Showing 13 changed files with 192 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,17 @@

import java.net.URL;

/**
* Perf testing options for {@link HttpPipelineTest}.
*/
public class HttpPipelineOptions extends PerfStressOptions {
@Parameter(names = { "-u", "--url" }, description = "URL to fetch", required = true)
private URL url;

/**
* Returns the URL used by the HTTP request.
* @return The URL used by the HTTP request.
*/
public URL getUrl() {
return url;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,19 @@

import java.util.ArrayList;

/**
* Perf test for {@link HttpPipeline}.
*/
public class HttpPipelineTest extends PerfStressTest<HttpPipelineOptions> {
private static final int BUFFER_SIZE = 16 * 1024 * 1024;

private final HttpPipeline httpPipeline;
private final byte[] buffer = new byte[BUFFER_SIZE];

/**
* Creates an instance of the {@link HttpPipelineTest}.
* @param options options to configure the HTTP pipeline.
*/
public HttpPipelineTest(HttpPipelineOptions options) {
super(options);

Expand All @@ -29,7 +36,7 @@ public HttpPipelineTest(HttpPipelineOptions options) {
}

if (policies != null) {
ArrayList<HttpPipelinePolicy> policyList = new ArrayList<HttpPipelinePolicy>();
ArrayList<HttpPipelinePolicy> policyList = new ArrayList<>();
for (HttpPipelinePolicy policy : policies) {
policyList.add(policy);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ public static void run(Class<?> testClass, PerfStressOptions options) {
Flux.just(tests).flatMap(PerfStressTest::stopPlaybackAsync).blockLast();
playbackStatus.dispose();
}
}
finally {
} finally {
if (!options.isNoCleanup()) {
cleanupStatus = printStatus("=== Cleanup ===", () -> ".", false, false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,20 @@
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClientResponse;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.ArrayList;
import javax.net.ssl.SSLException;

/**
* Represents the abstraction of a Performance test class.
*
* <p>
* The performance test class needs to extend this class. The test class should override {@link PerfStressTest#run()}
* and {@link PerfStressTest#runAsync()} methods and the synchronous and asynchronous test logic respectively.
* To add any test setup and logic the test class should override {@link PerfStressTest#globalSetupAsync()}
* and {@link PerfStressTest#globalCleanupAsync()} methods .
* The performance test class needs to extend this class. The test class should override {@link PerfStressTest#run()}
* and {@link PerfStressTest#runAsync()} methods and the synchronous and asynchronous test logic respectively.
* To add any test setup and logic the test class should override {@link PerfStressTest#globalSetupAsync()}
* and {@link PerfStressTest#globalCleanupAsync()} methods .
* </p>
*
*
* @param <TOptions> the options configured for the test.
*/
public abstract class PerfStressTest<TOptions extends PerfStressOptions> {
Expand All @@ -45,6 +41,7 @@ public abstract class PerfStressTest<TOptions extends PerfStressOptions> {
/**
* Creates an instance of performance test.
* @param options the options configured for the test.
* @throws IllegalStateException if SSL context cannot be created.
*/
public PerfStressTest(TOptions options) {
this.options = options;
Expand All @@ -54,45 +51,45 @@ public PerfStressTest(TOptions options) {
if (options.isInsecure()) {
try {
sslContext = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
}
catch (SSLException e) {
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
} catch (SSLException e) {
throw new IllegalStateException(e);
}

reactor.netty.http.client.HttpClient nettyHttpClient = reactor.netty.http.client.HttpClient.create()
.secure(sslContextSpec -> sslContextSpec.sslContext(sslContext));
.secure(sslContextSpec -> sslContextSpec.sslContext(sslContext));

httpClient = new NettyAsyncHttpClientBuilder(nettyHttpClient).build();
}
else {
} else {
sslContext = null;
httpClient = null;
}

if (options.getTestProxy() != null) {
if (options.isInsecure()) {
recordPlaybackHttpClient = reactor.netty.http.client.HttpClient.create()
.secure(sslContextSpec -> sslContextSpec.sslContext(sslContext));
}
else {
.secure(sslContextSpec -> sslContextSpec.sslContext(sslContext));
} else {
recordPlaybackHttpClient = reactor.netty.http.client.HttpClient.create();
}

testProxyPolicy = new TestProxyPolicy(options.getTestProxy());
policies = Arrays.asList(testProxyPolicy);
}
else {
} else {
recordPlaybackHttpClient = null;
testProxyPolicy = null;
policies = null;
}
}

// Attempts to configure a ClientBuilder using reflection. If a ClientBuilder does not follow the standard convention,
// it can be configured manually using the "httpClient" and "policies" fields.
protected void ConfigureClientBuilder(Object clientBuilder) {
/**
* Attempts to configure a ClientBuilder using reflection. If a ClientBuilder does not follow the standard convention,
* it can be configured manually using the "httpClient" and "policies" fields.
* @param clientBuilder The client builder.
* @throws IllegalStateException If reflective access to get httpClient or addPolicy methods fail.
*/
protected void configureClientBuilder(Object clientBuilder) {
if (httpClient != null || policies != null) {
Class<?> clientBuilderClass = clientBuilder.getClass();

Expand All @@ -101,17 +98,16 @@ protected void ConfigureClientBuilder(Object clientBuilder) {
Method httpClientMethod = clientBuilderClass.getMethod("httpClient", HttpClient.class);
httpClientMethod.invoke(clientBuilder, httpClient);
}

if (policies != null) {
Method addPolicyMethod = clientBuilderClass.getMethod("addPolicy", HttpPipelinePolicy.class);
for (HttpPipelinePolicy policy : policies) {
addPolicyMethod.invoke(clientBuilder, policy);
}
}
}
catch (ReflectiveOperationException e) {
} catch (ReflectiveOperationException e) {
throw new IllegalStateException(e);
}
}
}
}

Expand All @@ -131,27 +127,30 @@ public Mono<Void> setupAsync() {
return Mono.empty();
}

/**
* Records responses and starts async tests in playback mode.
* @return An empty {@link Mono}.
*/
public Mono<Void> recordAndStartPlaybackAsync() {
return startRecordingAsync()
.doOnSuccess(x -> {
testProxyPolicy.setRecordingId(recordingId);
testProxyPolicy.setMode("record");
})
// Must use Mono.defer() to ensure fields are set from prior requests
.then(Mono.defer(() -> runSyncOrAsync()))
.then(Mono.defer(() -> stopRecordingAsync()))
.then(Mono.defer(() -> startPlaybackAsync()))
.doOnSuccess(x -> {
testProxyPolicy.setRecordingId(recordingId);
testProxyPolicy.setMode("playback");
});
.doOnSuccess(x -> {
testProxyPolicy.setRecordingId(recordingId);
testProxyPolicy.setMode("record");
})
// Must use Mono.defer() to ensure fields are set from prior requests
.then(Mono.defer(() -> runSyncOrAsync()))
.then(Mono.defer(() -> stopRecordingAsync()))
.then(Mono.defer(() -> startPlaybackAsync()))
.doOnSuccess(x -> {
testProxyPolicy.setRecordingId(recordingId);
testProxyPolicy.setMode("playback");
});
}

private Mono<Void> runSyncOrAsync() {
if (options.isSync()) {
return Mono.empty().then().doOnSuccess(x -> run());
}
else {
} else {
return runAsync();
}
}
Expand All @@ -167,20 +166,24 @@ private Mono<Void> runSyncOrAsync() {
*/
public abstract Mono<Void> runAsync();

/**
* Stops playback tests.
* @return An empty {@link Mono}.
*/
public Mono<Void> stopPlaybackAsync() {
return recordPlaybackHttpClient
.headers(h -> {
h.set("x-recording-id", recordingId);
h.set("x-purge-inmemory-recording", Boolean.toString(true));
})
.post()
.uri(options.getTestProxy().resolve("/playback/stop"))
.response()
.doOnSuccess(response -> {
testProxyPolicy.setMode(null);
testProxyPolicy.setRecordingId(null);
})
.then();
.headers(h -> {
h.set("x-recording-id", recordingId);
h.set("x-purge-inmemory-recording", Boolean.toString(true));
})
.post()
.uri(options.getTestProxy().resolve("/playback/stop"))
.response()
.doOnSuccess(response -> {
testProxyPolicy.setMode(null);
testProxyPolicy.setRecordingId(null);
})
.then();
}

/**
Expand All @@ -201,33 +204,33 @@ public Mono<Void> globalCleanupAsync() {

private Mono<Void> startRecordingAsync() {
return recordPlaybackHttpClient
.post()
.uri(options.getTestProxy().resolve("/record/start"))
.response()
.doOnNext(response -> {
recordingId = response.responseHeaders().get("x-recording-id");
})
.then();
.post()
.uri(options.getTestProxy().resolve("/record/start"))
.response()
.doOnNext(response -> {
recordingId = response.responseHeaders().get("x-recording-id");
})
.then();
}

private Mono<Void> stopRecordingAsync() {
return recordPlaybackHttpClient
.headers(h -> h.set("x-recording-id", recordingId))
.post()
.uri(options.getTestProxy().resolve("/record/stop"))
.response()
.then();
.headers(h -> h.set("x-recording-id", recordingId))
.post()
.uri(options.getTestProxy().resolve("/record/stop"))
.response()
.then();
}

private Mono<Void> startPlaybackAsync() {
return recordPlaybackHttpClient
.headers(h -> h.set("x-recording-id", recordingId))
.post()
.uri(options.getTestProxy().resolve("/playback/start"))
.response()
.doOnNext(response -> {
recordingId = response.responseHeaders().get("x-recording-id");
})
.then();
.headers(h -> h.set("x-recording-id", recordingId))
.post()
.uri(options.getTestProxy().resolve("/playback/start"))
.response()
.doOnNext(response -> {
recordingId = response.responseHeaders().get("x-recording-id");
})
.then();
}
}
Loading

0 comments on commit 08a3b7f

Please sign in to comment.