Skip to content

Commit

Permalink
Streaming requests (#1207)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardm-stripe authored Jun 25, 2021
1 parent df63533 commit ecb791f
Show file tree
Hide file tree
Showing 10 changed files with 367 additions and 87 deletions.
84 changes: 84 additions & 0 deletions src/main/java/com/stripe/net/AbstractStripeResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.stripe.net;

import static java.util.Objects.requireNonNull;

import java.time.Instant;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Optional;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import lombok.experimental.NonFinal;

/** Common interface representing an HTTP response from Stripe. */
@Accessors(fluent = true)
abstract class AbstractStripeResponse<T> {
/** The HTTP status code of the response. */
int code;

/** The HTTP headers of the response. */
HttpHeaders headers;

/** The body of the response. */
T body;

public final int code() {
return this.code;
}

public final HttpHeaders headers() {
return this.headers;
}

public final T body() {
return this.body;
}

/** Number of times the request was retried. Used for internal tests only. */
@NonFinal
@Getter(AccessLevel.PACKAGE)
@Setter(AccessLevel.PACKAGE)
int numRetries;

/**
* Gets the date of the request, as returned by Stripe.
*
* @return the date of the request, as returned by Stripe
*/
public Instant date() {
Optional<String> dateStr = this.headers.firstValue("Date");
if (!dateStr.isPresent()) {
return null;
}
return ZonedDateTime.parse(dateStr.get(), DateTimeFormatter.RFC_1123_DATE_TIME).toInstant();
}

/**
* Gets the idempotency key of the request, as returned by Stripe.
*
* @return the idempotency key of the request, as returned by Stripe
*/
public String idempotencyKey() {
return this.headers.firstValue("Idempotency-Key").orElse(null);
}

/**
* Gets the ID of the request, as returned by Stripe.
*
* @return the ID of the request, as returned by Stripe
*/
public String requestId() {
return this.headers.firstValue("Request-Id").orElse(null);
}

protected AbstractStripeResponse(int code, HttpHeaders headers, T body) {
requireNonNull(headers);
requireNonNull(body);

this.code = code;
this.headers = headers;
this.body = body;
}
}
17 changes: 17 additions & 0 deletions src/main/java/com/stripe/net/ApiResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.stripe.model.StripeRawJsonObject;
import com.stripe.model.StripeRawJsonObjectDeserializer;
import com.stripe.util.StringUtils;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.Charset;
Expand Down Expand Up @@ -179,6 +180,22 @@ public static <T extends StripeObjectInterface> T request(
return ApiResource.stripeResponseGetter.request(method, url, params, clazz, options);
}

public static InputStream requestStream(
ApiResource.RequestMethod method, String url, ApiRequestParams params, RequestOptions options)
throws StripeException {
checkNullTypedParams(url, params);
return requestStream(method, url, params.toMap(), options);
}

public static InputStream requestStream(
ApiResource.RequestMethod method,
String url,
Map<String, Object> params,
RequestOptions options)
throws StripeException {
return ApiResource.stripeResponseGetter.requestStream(method, url, params, options);
}

public static <T extends StripeCollectionInterface<?>> T requestCollection(
String url, ApiRequestParams params, Class<T> clazz, RequestOptions options)
throws StripeException {
Expand Down
74 changes: 63 additions & 11 deletions src/main/java/com/stripe/net/HttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public abstract class HttpClient {
protected HttpClient() {}

/**
* Sends the given request to Stripe's API.
* Sends the given request to Stripe's API, buffering the response body into memory.
*
* @param request the request
* @return the response
Expand All @@ -38,13 +38,23 @@ protected HttpClient() {}
public abstract StripeResponse request(StripeRequest request) throws StripeException;

/**
* Sends the given request to Stripe's API, handling telemetry if not disabled.
* Sends the given request to Stripe's API, streaming the response body.
*
* @param request the request
* @return the response
* @throws StripeException If the request fails for any reason
*/
public StripeResponse requestWithTelemetry(StripeRequest request) throws StripeException {
public StripeResponseStream requestStream(StripeRequest request) throws StripeException {
throw new UnsupportedOperationException("requestStream is unimplemented for this HttpClient");
}

@FunctionalInterface
private interface RequestSendFunction<R> {
R apply(StripeRequest request) throws StripeException;
}

private <T extends AbstractStripeResponse<?>> T sendWithTelemetry(
StripeRequest request, RequestSendFunction<T> send) throws StripeException {
Optional<String> telemetryHeaderValue = requestTelemetry.getHeaderValue(request.headers());
if (telemetryHeaderValue.isPresent()) {
request =
Expand All @@ -53,7 +63,7 @@ public StripeResponse requestWithTelemetry(StripeRequest request) throws StripeE

Stopwatch stopwatch = Stopwatch.startNew();

StripeResponse response = this.request(request);
T response = send.apply(request);

stopwatch.stop();

Expand All @@ -63,23 +73,40 @@ public StripeResponse requestWithTelemetry(StripeRequest request) throws StripeE
}

/**
* Sends the given request to Stripe's API, retrying the request in cases of intermittent
* problems.
* Sends the given request to Stripe's API, handling telemetry if not disabled.
*
* @param request the request
* @return the response
* @throws StripeException If the request fails for any reason
*/
public StripeResponse requestWithRetries(StripeRequest request) throws StripeException {
public StripeResponse requestWithTelemetry(StripeRequest request) throws StripeException {
return sendWithTelemetry(request, this::request);
}

/**
* Sends the given request to Stripe's API, streaming the response, and handling telemetry if not
* disabled.
*
* @param request the request
* @return the response
* @throws StripeException If the request fails for any reason
*/
public StripeResponseStream requestStreamWithTelemetry(StripeRequest request)
throws StripeException {
return sendWithTelemetry(request, this::requestStream);
}

public <T extends AbstractStripeResponse<?>> T sendWithRetries(
StripeRequest request, RequestSendFunction<T> send) throws StripeException {
ApiConnectionException requestException = null;
StripeResponse response = null;
T response = null;
int retry = 0;

while (true) {
requestException = null;

try {
response = this.requestWithTelemetry(request);
response = send.apply(request);
} catch (ApiConnectionException e) {
requestException = e;
}
Expand All @@ -106,6 +133,31 @@ public StripeResponse requestWithRetries(StripeRequest request) throws StripeExc
return response;
}

/**
* Sends the given request to Stripe's API, retrying the request in cases of intermittent
* problems.
*
* @param request the request
* @return the response
* @throws StripeException If the request fails for any reason
*/
public StripeResponse requestWithRetries(StripeRequest request) throws StripeException {
return sendWithRetries(request, (r) -> this.requestWithTelemetry(r));
}

/**
* Sends the given request to Stripe's API, streaming the response, retrying the request in cases
* of intermittent problems.
*
* @param request the request
* @return the response
* @throws StripeException If the request fails for any reason
*/
public StripeResponseStream requestStreamWithRetries(StripeRequest request)
throws StripeException {
return sendWithRetries(request, (r) -> this.requestStreamWithTelemetry(r));
}

/**
* Builds the value of the {@code User-Agent} header.
*
Expand Down Expand Up @@ -165,8 +217,8 @@ private static String formatAppInfo(Map<String, String> info) {
return str;
}

private boolean shouldRetry(
int numRetries, StripeException exception, StripeRequest request, StripeResponse response) {
private <T extends AbstractStripeResponse<?>> boolean shouldRetry(
int numRetries, StripeException exception, StripeRequest request, T response) {
// Do not retry if we are out of retries.
if (numRetries >= request.options().getMaxNetworkRetries()) {
return false;
Expand Down
31 changes: 25 additions & 6 deletions src/main/java/com/stripe/net/HttpURLConnectionClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.stripe.Stripe;
import com.stripe.exception.ApiConnectionException;
import com.stripe.util.StreamUtils;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -29,7 +28,7 @@ public HttpURLConnectionClient() {
* @throws ApiConnectionException if an error occurs when sending or receiving
*/
@Override
public StripeResponse request(StripeRequest request) throws ApiConnectionException {
public StripeResponseStream requestStream(StripeRequest request) throws ApiConnectionException {
try {
final HttpURLConnection conn = createStripeConnection(request);

Expand All @@ -43,12 +42,32 @@ public StripeResponse request(StripeRequest request) throws ApiConnectionExcepti
? conn.getInputStream()
: conn.getErrorStream();

final String responseBody = StreamUtils.readToEnd(responseStream, ApiResource.CHARSET);

responseStream.close();
return new StripeResponseStream(responseCode, headers, responseStream);

return new StripeResponse(responseCode, headers, responseBody);
} catch (IOException e) {
throw new ApiConnectionException(
String.format(
"IOException during API request to Stripe (%s): %s "
+ "Please check your internet connection and try again. If this problem persists,"
+ "you should check Stripe's service status at https://twitter.com/stripestatus,"
+ " or let us know at support@stripe.com.",
Stripe.getApiBase(), e.getMessage()),
e);
}
}

/**
* Sends the given request to Stripe's API, and returns a buffered response.
*
* @param request the request
* @return the response
* @throws ApiConnectionException if an error occurs when sending or receiving
*/
@Override
public StripeResponse request(StripeRequest request) throws ApiConnectionException {
final StripeResponseStream responseStream = requestStream(request);
try {
return responseStream.unstream();
} catch (IOException e) {
throw new ApiConnectionException(
String.format(
Expand Down
36 changes: 36 additions & 0 deletions src/main/java/com/stripe/net/LiveStripeResponseGetter.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.google.gson.JsonObject;
import com.google.gson.JsonSyntaxException;
import com.stripe.Stripe;
import com.stripe.exception.ApiConnectionException;
import com.stripe.exception.ApiException;
import com.stripe.exception.AuthenticationException;
import com.stripe.exception.CardException;
Expand All @@ -20,6 +22,8 @@
import com.stripe.model.StripeObject;
import com.stripe.model.StripeObjectInterface;
import com.stripe.model.oauth.OAuthError;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;

public class LiveStripeResponseGetter implements StripeResponseGetter {
Expand Down Expand Up @@ -73,6 +77,38 @@ public <T extends StripeObjectInterface> T request(
return resource;
}

@Override
public InputStream requestStream(
ApiResource.RequestMethod method,
String url,
Map<String, Object> params,
RequestOptions options)
throws StripeException {
StripeRequest request = new StripeRequest(method, url, params, options);
StripeResponseStream responseStream = httpClient.requestStreamWithRetries(request);

int responseCode = responseStream.code();

if (responseCode < 200 || responseCode >= 300) {
StripeResponse response;
try {
response = responseStream.unstream();
} catch (IOException e) {
throw new ApiConnectionException(
String.format(
"IOException during API request to Stripe (%s): %s "
+ "Please check your internet connection and try again. If this problem persists,"
+ "you should check Stripe's service status at https://twitter.com/stripestatus,"
+ " or let us know at support@stripe.com.",
Stripe.getApiBase(), e.getMessage()),
e);
}
handleApiError(response);
}

return responseStream.body();
}

@Override
public <T extends StripeObjectInterface> T oauthRequest(
ApiResource.RequestMethod method,
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/stripe/net/RequestTelemetry.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public Optional<String> getHeaderValue(HttpHeaders headers) {
* @param response the Stripe response
* @param duration the request duration
*/
public void maybeEnqueueMetrics(StripeResponse response, Duration duration) {
public void maybeEnqueueMetrics(AbstractStripeResponse<?> response, Duration duration) {
if (!Stripe.enableTelemetry) {
return;
}
Expand Down
Loading

0 comments on commit ecb791f

Please sign in to comment.