Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make VertxGrpcExporter more robust #38895

Merged
merged 1 commit into from
Feb 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.quarkus.vertx.core.runtime.BufferOutputStream;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
Expand All @@ -43,6 +45,7 @@ final class VertxGrpcExporter implements SpanExporter {
private static final String GRPC_MESSAGE = "grpc-message";

private static final Logger internalLogger = Logger.getLogger(VertxGrpcExporter.class.getName());
private static final int MAX_ATTEMPTS = 3;

private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger); // TODO: is there something in JBoss Logging we can use?

Expand Down Expand Up @@ -87,30 +90,35 @@ private CompletableResultCode export(TraceRequestMarshaler marshaler, int numIte
exporterMetrics.addSeen(numItems);

var result = new CompletableResultCode();
var onSuccessHandler = new ClientRequestOnSuccessHandler(headers, compressionEnabled, exporterMetrics, marshaler,
loggedUnimplemented, logger, type, numItems, result);
client.request(server)
.onSuccess(onSuccessHandler)
.onFailure(new Handler<>() {
@Override
public void handle(Throwable t) {
// TODO: is there a better way todo retry?
// TODO: should we only retry on a specific errors?

client.request(server)
.onSuccess(onSuccessHandler)
.onFailure(new Handler<>() {
@Override
public void handle(Throwable event) {
failOnClientRequest(numItems, t, result);
}
});
}
});
var onSuccessHandler = new ClientRequestOnSuccessHandler(client, server, headers, compressionEnabled, exporterMetrics,
marshaler,
loggedUnimplemented, logger, type, numItems, result, 1);

initiateSend(client, server, MAX_ATTEMPTS, onSuccessHandler, new Consumer<>() {
@Override
public void accept(Throwable throwable) {
failOnClientRequest(numItems, throwable, result);
}
});

return result;
}

private static void initiateSend(GrpcClient client, SocketAddress server,
int numberOfAttempts,
Handler<GrpcClientRequest<Buffer, Buffer>> onSuccessHandler,
Consumer<Throwable> onFailureCallback) {
Future<GrpcClientRequest<Buffer, Buffer>> reqFuture = client.request(server);
Uni.createFrom().completionStage(reqFuture.toCompletionStage()).onFailure().retry().withBackOff(Duration.ofMillis(100))
.atMost(numberOfAttempts).subscribe().with(
new Consumer<>() {
@Override
public void accept(GrpcClientRequest<Buffer, Buffer> request) {
onSuccessHandler.handle(request);
}
}, onFailureCallback);
}

private void failOnClientRequest(int numItems, Throwable t, CompletableResultCode result) {
exporterMetrics.addFailed(numItems);
logger.log(
Expand Down Expand Up @@ -146,6 +154,8 @@ public CompletableResultCode shutdown() {

geoand marked this conversation as resolved.
Show resolved Hide resolved
private static final class ClientRequestOnSuccessHandler implements Handler<GrpcClientRequest<Buffer, Buffer>> {

private final GrpcClient client;
private final SocketAddress server;
private final Map<String, String> headers;
private final boolean compressionEnabled;
private final ExporterMetrics exporterMetrics;
Expand All @@ -157,15 +167,22 @@ private static final class ClientRequestOnSuccessHandler implements Handler<Grpc
private final int numItems;
private final CompletableResultCode result;

public ClientRequestOnSuccessHandler(Map<String, String> headers,
private final int attemptNumber;

public ClientRequestOnSuccessHandler(GrpcClient client,
SocketAddress server,
Map<String, String> headers,
boolean compressionEnabled,
ExporterMetrics exporterMetrics,
TraceRequestMarshaler marshaler,
AtomicBoolean loggedUnimplemented,
ThrottlingLogger logger,
String type,
int numItems,
CompletableResultCode result) {
CompletableResultCode result,
int attemptNumber) {
this.client = client;
this.server = server;
this.headers = headers;
this.compressionEnabled = compressionEnabled;
this.exporterMetrics = exporterMetrics;
Expand All @@ -175,6 +192,7 @@ public ClientRequestOnSuccessHandler(Map<String, String> headers,
this.type = type;
this.numItems = numItems;
this.result = result;
this.attemptNumber = attemptNumber;
}

@Override
Expand Down Expand Up @@ -205,14 +223,28 @@ public void handle(GrpcClientResponse<Buffer, Buffer> response) {
response.exceptionHandler(new Handler<>() {
@Override
public void handle(Throwable t) {
exporterMetrics.addFailed(numItems);
logger.log(
Level.SEVERE,
"Failed to export "
+ type
+ "s. The stream failed. Full error message: "
+ t.getMessage());
result.fail();
if (attemptNumber <= MAX_ATTEMPTS) {
// retry
initiateSend(client, server,
MAX_ATTEMPTS - attemptNumber,
brunobat marked this conversation as resolved.
Show resolved Hide resolved
newAttempt(),
new Consumer<>() {
@Override
public void accept(Throwable throwable) {
failOnClientRequest(numItems, throwable, result);
}
});

} else {
exporterMetrics.addFailed(numItems);
logger.log(
Level.SEVERE,
"Failed to export "
+ type
+ "s. The stream failed. Full error message: "
+ t.getMessage());
result.fail();
}
}
}).errorHandler(new Handler<>() {
@Override
Expand Down Expand Up @@ -336,14 +368,27 @@ private String getStatusMessage(GrpcClientResponse<Buffer, Buffer> response) {
}).onFailure(new Handler<>() {
@Override
public void handle(Throwable t) {
exporterMetrics.addFailed(numItems);
logger.log(
Level.SEVERE,
"Failed to export "
+ type
+ "s. The request could not be executed. Full error message: "
+ t.getMessage());
result.fail();
if (attemptNumber <= MAX_ATTEMPTS) {
// retry
initiateSend(client, server,
MAX_ATTEMPTS - attemptNumber,
newAttempt(),
new Consumer<>() {
@Override
public void accept(Throwable throwable) {
failOnClientRequest(numItems, throwable, result);
}
});
} else {
exporterMetrics.addFailed(numItems);
logger.log(
Level.SEVERE,
"Failed to export "
+ type
+ "s. The request could not be executed. Full error message: "
+ t.getMessage());
result.fail();
}
}
});
} catch (IOException e) {
Expand All @@ -357,5 +402,21 @@ public void handle(Throwable t) {
result.fail();
}
}

private void failOnClientRequest(int numItems, Throwable t, CompletableResultCode result) {
exporterMetrics.addFailed(numItems);
brunobat marked this conversation as resolved.
Show resolved Hide resolved
logger.log(
Level.SEVERE,
"Failed to export "
+ type
+ "s. The request could not be executed. Full error message: "
+ t.getMessage());
result.fail();
}

public ClientRequestOnSuccessHandler newAttempt() {
return new ClientRequestOnSuccessHandler(client, server, headers, compressionEnabled, exporterMetrics, marshaler,
loggedUnimplemented, logger, type, numItems, result, attemptNumber + 1);
}
}
}
Loading