Skip to content

Commit

Permalink
Refactor two-client usage.
Browse files Browse the repository at this point in the history
Signed-off-by: dblock <dblock@amazon.com>
  • Loading branch information
dblock committed Jan 18, 2023
1 parent 04beb24 commit 948399d
Showing 1 changed file with 26 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.utils.SdkAutoCloseable;

import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -70,8 +71,7 @@ public class AwsSdk2Transport implements OpenSearchTransport {
public static final Integer DEFAULT_REQUEST_COMPRESSION_SIZE = 8192;

private static final byte[] NO_BYTES = new byte[0];
private final SdkHttpClient httpClient;
private final SdkAsyncHttpClient asyncHttpClient;
private final SdkAutoCloseable httpClient;
private final String host;
private final Region signingRegion;
private final JsonpMapper defaultMapper;
Expand All @@ -92,33 +92,11 @@ public class AwsSdk2Transport implements OpenSearchTransport {
* compression options, etc.
*/
public AwsSdk2Transport(
@Nonnull SdkHttpClient httpClient,
@Nonnull SdkAutoCloseable httpClient,
@Nonnull String host,
@Nonnull Region signingRegion,
@CheckForNull AwsSdk2TransportOptions options) {
this(httpClient, null, host, signingRegion, options);
}

/**
* Create an {@link OpenSearchTransport} with an ASYNCHRONOUS AWS Http client
* <p>
* Note that synchronous OpenSearch requests sent through this transport will be dispatched
* using the asynchronous client, but the calling thread will block until they are complete.
*
* @param asyncHttpClient HTTP client to use for OpenSearch requests
* @param host The target host
* @param signingRegion The AWS region for which requests will be signed. This should typically match
* the region in `host`.
* @param options Options that apply to all requests. Can be null. Create with
* {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials,
* compression options, etc.
*/
public AwsSdk2Transport(
@Nonnull SdkAsyncHttpClient asyncHttpClient,
@Nonnull String host,
@Nonnull Region signingRegion,
@CheckForNull AwsSdk2TransportOptions options) {
this(null, asyncHttpClient, host, signingRegion, options);
this(httpClient, host, "es", signingRegion, options);
}

/**
Expand All @@ -127,28 +105,22 @@ public AwsSdk2Transport(
* The synchronous client will be used for synchronous OpenSearch requests, and the asynchronous client
* will be used for asynchronous HTTP requests.
*
* @param httpClient HTTP client to use for OpenSearch requests
* @param asyncHttpClient HTTP client to use for synchronous OpenSearch requests
* @param host The fully qualified domain name to connect to
* @param signingRegion The AWS region for which requests will be signed. This should typically match
* the region in `host`.
* @param options Options that apply to all requests. Can be null. Create with
* @param httpClient HTTP client to use for OpenSearch requests.
* @param host The fully qualified domain name to connect to.
* @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`.
* @param signingServiceName The AWS signing service name, one of `es` (Amazon OpenSearch) or `aoss` (Amazon OpenSearch Serverless).
* @param options Options that apply to all requests. Can be null. Create with
* {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials,
* compression options, etc.
*/
public AwsSdk2Transport(
@CheckForNull SdkHttpClient httpClient,
@CheckForNull SdkAsyncHttpClient asyncHttpClient,
@CheckForNull SdkAutoCloseable httpClient,
@Nonnull String host,
@Nonnull String signingServiceName,
@Nonnull Region signingRegion,
@CheckForNull AwsSdk2TransportOptions options) {
if (httpClient == null && asyncHttpClient == null)
{
throw new IllegalArgumentException("At least one SdkHttpClient or SdkAsyncHttpClient must be provided");
}
Objects.requireNonNull(host, "Target OpenSearch service host must not be null");
this.httpClient = httpClient;
this.asyncHttpClient = asyncHttpClient;
this.host = host;
this.signingRegion = signingRegion;
this.transportOptions = options != null ? options : AwsSdk2TransportOptions.builder().build();
Expand All @@ -167,11 +139,11 @@ public <RequestT, ResponseT, ErrorT> ResponseT performRequest(
OpenSearchRequestBodyBuffer requestBody = prepareRequestBody(request, endpoint, options);
SdkHttpFullRequest clientReq = prepareRequest(request, endpoint, options, requestBody);

if (httpClient != null) {
return executeSync(clientReq, endpoint, options);
} else {
if (httpClient instanceof SdkHttpClient) {
return executeSync((SdkHttpClient) httpClient, clientReq, endpoint, options);
} else if (httpClient instanceof SdkAsyncHttpClient) {
try {
return executeAsync(clientReq, requestBody, endpoint, options).get();
return executeAsync((SdkAsyncHttpClient) httpClient, clientReq, requestBody, endpoint, options).get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause != null) {
Expand All @@ -187,6 +159,8 @@ public <RequestT, ResponseT, ErrorT> ResponseT performRequest(
} catch (InterruptedException e) {
throw new IOException("HttpRequest was interrupted", e);
}
} else {
throw new IOException("invalid httpClient: " + httpClient);
}
}

Expand All @@ -199,11 +173,13 @@ public <RequestT, ResponseT, ErrorT> CompletableFuture<ResponseT> performRequest
try {
OpenSearchRequestBodyBuffer requestBody = prepareRequestBody(request, endpoint, options);
SdkHttpFullRequest clientReq = prepareRequest(request, endpoint, options, requestBody);
if (asyncHttpClient != null) {
return executeAsync(clientReq, requestBody, endpoint, options);
} else {
ResponseT result = executeSync(clientReq, endpoint, options);
if (httpClient instanceof SdkAsyncHttpClient) {
return executeAsync((SdkAsyncHttpClient) httpClient, clientReq, requestBody, endpoint, options);
} else if (httpClient instanceof SdkHttpClient) {
ResponseT result = executeSync((SdkHttpClient) httpClient, clientReq, endpoint, options);
return CompletableFuture.completedFuture(result);
} else {
throw new IOException("invalid httpClient: " + httpClient);
}
} catch (Throwable e) {
CompletableFuture<ResponseT> cf = new CompletableFuture<>();
Expand Down Expand Up @@ -348,6 +324,7 @@ private void applyOptionsHeaders(SdkHttpFullRequest.Builder builder, TransportOp
}

private <ResponseT> ResponseT executeSync(
SdkHttpClient syncHttpClient,
SdkHttpFullRequest httpRequest,
Endpoint<?, ResponseT, ?> endpoint,
TransportOptions options
Expand All @@ -357,7 +334,7 @@ private <ResponseT> ResponseT executeSync(
if (httpRequest.contentStreamProvider().isPresent()) {
executeRequest.contentStreamProvider(httpRequest.contentStreamProvider().get());
}
HttpExecuteResponse executeResponse = httpClient.prepareRequest(executeRequest.build()).call();
HttpExecuteResponse executeResponse = syncHttpClient.prepareRequest(executeRequest.build()).call();
AbortableInputStream bodyStream = null;
try {
bodyStream = executeResponse.responseBody().orElse(null);
Expand All @@ -371,6 +348,7 @@ private <ResponseT> ResponseT executeSync(
}

private <ResponseT> CompletableFuture<ResponseT> executeAsync(
SdkAsyncHttpClient asyncHttpClient,
SdkHttpFullRequest httpRequest,
@CheckForNull OpenSearchRequestBodyBuffer requestBody,
Endpoint<?, ResponseT, ?> endpoint,
Expand Down Expand Up @@ -473,7 +451,6 @@ private <ResponseT, ErrorT> ResponseT parseResponse(
ResponseT response = (ResponseT) new BooleanResponse(bep.getResult(statusCode));
return response;
} else if (endpoint instanceof JsonEndpoint) {
@SuppressWarnings("unchecked")
JsonEndpoint<?, ResponseT, ?> jsonEndpoint = (JsonEndpoint<?, ResponseT, ?>) endpoint;
// Successful response
ResponseT response = null;
Expand Down

0 comments on commit 948399d

Please sign in to comment.