Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make VertxHttpExporter more robust #39020

Merged
merged 1 commit into from
Feb 27, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.GZIPOutputStream;
Expand All @@ -22,6 +24,7 @@
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.quarkus.vertx.core.runtime.BufferOutputStream;
import io.smallrye.mutiny.Uni;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
Expand All @@ -38,6 +41,8 @@ final class VertxHttpExporter implements SpanExporter {
private static final Logger internalLogger = Logger.getLogger(VertxHttpExporter.class.getName());
private static final ThrottlingLogger logger = new ThrottlingLogger(internalLogger);

private static final int MAX_ATTEMPTS = 3;

private final HttpExporter<TraceRequestMarshaler> delegate;

VertxHttpExporter(HttpExporter<TraceRequestMarshaler> delegate) {
Expand Down Expand Up @@ -110,75 +115,35 @@ private static String determineBasePath(URI baseUri) {
@Override
public void send(Consumer<OutputStream> marshaler,
int contentLength,
Consumer<Response> onResponse,
Consumer<Response> onHttpResponseRead,
Consumer<Throwable> onError) {

client.request(HttpMethod.POST, basePath + TRACES_PATH)
.onSuccess(new Handler<>() {
@Override
public void handle(HttpClientRequest request) {

HttpClientRequest clientRequest = request.response(new Handler<>() {
@Override
public void handle(AsyncResult<HttpClientResponse> callResult) {
if (callResult.succeeded()) {
HttpClientResponse clientResponse = callResult.result();
clientResponse.body(new Handler<>() {
@Override
public void handle(AsyncResult<Buffer> bodyResult) {
if (bodyResult.succeeded()) {
onResponse.accept(new Response() {
@Override
public int statusCode() {
return clientResponse.statusCode();
}

@Override
public String statusMessage() {
return clientResponse.statusMessage();
}

@Override
public byte[] responseBody() {
return bodyResult.result().getBytes();
}
});
} else {
onError.accept(bodyResult.cause());
}
}
});
} else {
onError.accept(callResult.cause());
}
}
})
.putHeader("Content-Type", contentType);

Buffer buffer = Buffer.buffer(contentLength);
OutputStream os = new BufferOutputStream(buffer);
if (compressionEnabled) {
clientRequest.putHeader("Content-Encoding", "gzip");
try (var gzos = new GZIPOutputStream(os)) {
marshaler.accept(gzos);
} catch (IOException e) {
throw new IllegalStateException(e);
}
} else {
marshaler.accept(os);
}

if (!headers.isEmpty()) {
for (var entry : headers.entrySet()) {
clientRequest.putHeader(entry.getKey(), entry.getValue());
}
}

clientRequest.send(buffer);
String requestURI = basePath + TRACES_PATH;
var clientRequestSuccessHandler = new ClientRequestSuccessHandler(client, requestURI, headers, compressionEnabled,
contentType,
contentLength, onHttpResponseRead,
onError, marshaler, 1);
initiateSend(client, requestURI, MAX_ATTEMPTS, clientRequestSuccessHandler, onError);
}

private static void initiateSend(HttpClient client, String requestURI,
int numberOfAttempts,
Handler<HttpClientRequest> clientRequestSuccessHandler,
Consumer<Throwable> onError) {
Uni.createFrom().completionStage(new Supplier<CompletionStage<HttpClientRequest>>() {
@Override
public CompletionStage<HttpClientRequest> get() {
return client.request(HttpMethod.POST, requestURI).toCompletionStage();
}
}).onFailure().retry()
.withBackOff(Duration.ofMillis(100))
.atMost(numberOfAttempts)
.subscribe().with(new Consumer<>() {
@Override
public void accept(HttpClientRequest request) {
clientRequestSuccessHandler.handle(request);
}
})
.onFailure(onError::accept);
}, onError);
}

@Override
Expand All @@ -204,5 +169,134 @@ public void handle(Throwable event) {
});
return shutdownResult;
}

private static class ClientRequestSuccessHandler implements Handler<HttpClientRequest> {
private final HttpClient client;
private final String requestURI;
private final Map<String, String> headers;
private final boolean compressionEnabled;
private final String contentType;
private final int contentLength;
private final Consumer<Response> onHttpResponseRead;
private final Consumer<Throwable> onError;
private final Consumer<OutputStream> marshaler;

private final int attemptNumber;

public ClientRequestSuccessHandler(HttpClient client,
String requestURI, Map<String, String> headers,
boolean compressionEnabled,
String contentType,
int contentLength,
Consumer<Response> onHttpResponseRead,
Consumer<Throwable> onError,
Consumer<OutputStream> marshaler,
int attemptNumber) {
this.client = client;
this.requestURI = requestURI;
this.headers = headers;
this.compressionEnabled = compressionEnabled;
this.contentType = contentType;
this.contentLength = contentLength;
this.onHttpResponseRead = onHttpResponseRead;
this.onError = onError;
this.marshaler = marshaler;
this.attemptNumber = attemptNumber;
}

@Override
public void handle(HttpClientRequest request) {

HttpClientRequest clientRequest = request.response(new Handler<>() {
@Override
public void handle(AsyncResult<HttpClientResponse> callResult) {
if (callResult.succeeded()) {
HttpClientResponse clientResponse = callResult.result();
clientResponse.body(new Handler<>() {
@Override
public void handle(AsyncResult<Buffer> bodyResult) {
if (bodyResult.succeeded()) {
if (clientResponse.statusCode() >= 500) {
if (attemptNumber <= MAX_ATTEMPTS) {
// we should retry for 5xx error as they might be recoverable
initiateSend(client, requestURI,
MAX_ATTEMPTS - attemptNumber,
newAttempt(),
onError);
return;
}
}
onHttpResponseRead.accept(new Response() {
@Override
public int statusCode() {
return clientResponse.statusCode();
}

@Override
public String statusMessage() {
return clientResponse.statusMessage();
}

@Override
public byte[] responseBody() {
return bodyResult.result().getBytes();
}
});
} else {
if (attemptNumber <= MAX_ATTEMPTS) {
// retry
initiateSend(client, requestURI,
MAX_ATTEMPTS - attemptNumber,
newAttempt(),
onError);
} else {
onError.accept(bodyResult.cause());
}
}
}
});
} else {
if (attemptNumber <= MAX_ATTEMPTS) {
// retry
initiateSend(client, requestURI,
MAX_ATTEMPTS - attemptNumber,
newAttempt(),
onError);
} else {
onError.accept(callResult.cause());
}
}
}
})
.putHeader("Content-Type", contentType);

Buffer buffer = Buffer.buffer(contentLength);
OutputStream os = new BufferOutputStream(buffer);
if (compressionEnabled) {
clientRequest.putHeader("Content-Encoding", "gzip");
try (var gzos = new GZIPOutputStream(os)) {
marshaler.accept(gzos);
} catch (IOException e) {
throw new IllegalStateException(e);
}
} else {
marshaler.accept(os);
}

if (!headers.isEmpty()) {
for (var entry : headers.entrySet()) {
clientRequest.putHeader(entry.getKey(), entry.getValue());
}
}

clientRequest.send(buffer);
}

public ClientRequestSuccessHandler newAttempt() {
return new ClientRequestSuccessHandler(client, requestURI, headers, compressionEnabled,
contentType, contentLength, onHttpResponseRead,
onError, marshaler, attemptNumber + 1);
}
}
}
}
Loading