Skip to content
This repository has been archived by the owner on Apr 4, 2024. It is now read-only.

Commit

Permalink
fix: channel closed and not reopen after an error occured; share same…
Browse files Browse the repository at this point in the history
… ServiceResolver cross different ClientCaller

Signed-off-by: zhiyizhao <zhiyizhao@lightinthebox.com>
  • Loading branch information
zhiyizhao committed Oct 25, 2022
1 parent 0383f3c commit 5214361
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 24 deletions.
6 changes: 5 additions & 1 deletion src/main/java/vn/zalopay/benchmark/GRPCSampler.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private boolean initGrpcRequestSampler(SampleResult sampleResult) {
try {
initGrpcInCurrentThread(sampleResult);
} catch (Exception e) {
log.error(ExceptionUtils.getPrintExceptionToStr(e, null), "UTF-8");
log.error("init grpc request sampler error", e);
generateErrorResultInInitGRPCRequest(sampleResult, e);
return false;
}
Expand Down Expand Up @@ -168,6 +168,7 @@ private void generateSuccessResult(GrpcResponse grpcResponse, SampleResult sampl

private void generateErrorResult(GrpcResponse grpcResponse, SampleResult sampleResult) {
Throwable throwable = grpcResponse.getThrowable();
log.error("request grpc server error", throwable);
sampleResult.setSuccessful(false);
sampleResult.setResponseCode(" 500");
boolean isRuntimeException = throwable instanceof StatusRuntimeException;
Expand All @@ -186,6 +187,9 @@ private void generateStatusRuntimeExceptionResponseData(
Status.Code code = status.getCode();
responseMessage += code.value() + " " + code.name();
responseData = status.getDescription();
if (responseData == null) {
responseData = "";
}
sampleResult.setResponseMessage(responseMessage);
sampleResult.setResponseData(responseData, "UTF-8");
}
Expand Down
87 changes: 64 additions & 23 deletions src/main/java/vn/zalopay/benchmark/core/ClientCaller.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,17 @@
import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class ClientCaller {
private Descriptors.MethodDescriptor methodDescriptor;
private JsonFormat.TypeRegistry registry;
private DynamicGrpcClient dynamicClient;
private volatile DynamicGrpcClient dynamicClient;
private ImmutableList<DynamicMessage> requestMessages;
private ManagedChannel channel;
private volatile ManagedChannel channel;
private HostAndPort hostAndPort;
private Map<String, String> metadataMap;
private boolean tls;
Expand All @@ -47,6 +49,31 @@ public class ClientCaller {
private final GrpcRequestConfig requestConfig;
ChannelFactory channelFactory;

private static final ConcurrentHashMap<MDKey, ServiceResolver> cache = new ConcurrentHashMap<>();

private static class MDKey {
private final String testProtoFiles;
private final String libFolder;

public MDKey(String testProtoFiles, String libFolder) {
this.testProtoFiles = testProtoFiles;
this.libFolder = libFolder;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MDKey mdKey = (MDKey) o;
return Objects.equals(testProtoFiles, mdKey.testProtoFiles) && Objects.equals(libFolder, mdKey.libFolder);
}

@Override
public int hashCode() {
return Objects.hash(testProtoFiles, libFolder);
}
}

public ClientCaller(GrpcRequestConfig requestConfig) {
this.requestConfig = requestConfig;
this.init(
Expand Down Expand Up @@ -76,20 +103,22 @@ private void init(
channelFactory = ChannelFactory.create();
ProtoMethodName grpcMethodName = ProtoMethodName.parseFullGrpcMethodName(fullMethod);

// Fetch the appropriate file descriptors for the service.
final DescriptorProtos.FileDescriptorSet fileDescriptorSet;
ServiceResolver serviceResolver = cache.computeIfAbsent(new MDKey(testProtoFiles, libFolder), mdk -> {
// Fetch the appropriate file descriptors for the service.
final DescriptorProtos.FileDescriptorSet fileDescriptorSet;

try {
fileDescriptorSet = ProtocInvoker.forConfig(testProtoFiles, libFolder).invoke();
} catch (Exception e) {
shutdownNettyChannel();
throw new RuntimeException(
"Unable to resolve service by invoking protoc: \n" + e.getMessage(), e);
}
try {
fileDescriptorSet = ProtocInvoker.forConfig(testProtoFiles, libFolder).invoke();
} catch (Exception e) {
shutdownNettyChannel();
throw new RuntimeException(
"Unable to resolve service by invoking protoc: \n" + e.getMessage(), e);
}

// Set up the dynamic client and make the call.
return ServiceResolver.fromFileDescriptorSet(fileDescriptorSet);
});

// Set up the dynamic client and make the call.
ServiceResolver serviceResolver =
ServiceResolver.fromFileDescriptorSet(fileDescriptorSet);
methodDescriptor = serviceResolver.resolveServiceMethod(grpcMethodName);

createDynamicClient();
Expand Down Expand Up @@ -149,15 +178,17 @@ private Map<String, String> buildHashMetadata(String metadata) {
}

public void createDynamicClient() {
channel =
channelFactory.createChannel(
hostAndPort,
tls,
disableTtlVerification,
metadataMap,
requestConfig.getMaxInboundMessageSize(),
requestConfig.getMaxInboundMetadataSize());
dynamicClient = DynamicGrpcClient.create(methodDescriptor, channel);
if (dynamicClient == null) {
channel =
channelFactory.createChannel(
hostAndPort,
tls,
disableTtlVerification,
metadataMap,
requestConfig.getMaxInboundMessageSize(),
requestConfig.getMaxInboundMetadataSize());
dynamicClient = DynamicGrpcClient.create(methodDescriptor, channel);
}
}

public boolean isShutdown() {
Expand Down Expand Up @@ -193,10 +224,13 @@ public GrpcResponse call(String deadlineMs) {
StreamObserver<DynamicMessage> streamObserver =
ComponentObserver.of(Writer.create(grpcResponse, registry));
try {
createDynamicClient();
dynamicClient
.blockingUnaryCall(requestMessages, streamObserver, callOptions(deadline))
.get();
} catch (Exception e) {
grpcResponse.setSuccess(false);
grpcResponse.setThrowable(e);
shutdownNettyChannel();
}

Expand All @@ -209,6 +243,7 @@ public GrpcResponse callServerStreaming(String deadlineMs) {
StreamObserver<DynamicMessage> streamObserver =
ComponentObserver.of(Writer.create(grpcResponse, registry));
try {
createDynamicClient();
dynamicClient
.callServerStreaming(requestMessages, streamObserver, callOptions(deadline))
.get();
Expand All @@ -225,6 +260,7 @@ public GrpcResponse callClientStreaming(String deadlineMs) {
StreamObserver<DynamicMessage> streamObserver =
ComponentObserver.of(Writer.create(output, registry));
try {
createDynamicClient();
dynamicClient
.callClientStreaming(requestMessages, streamObserver, callOptions(deadline))
.get();
Expand All @@ -245,6 +281,7 @@ public GrpcResponse callBidiStreaming(String deadlineMs) {
StreamObserver<DynamicMessage> streamObserver =
ComponentObserver.of(Writer.create(output, registry));
try {
createDynamicClient();
dynamicClient
.callBidiStreaming(requestMessages, streamObserver, callOptions(deadline))
.get();
Expand Down Expand Up @@ -272,9 +309,13 @@ public void shutdownNettyChannel() {
if (channel != null) {
channel.shutdown();
channel.awaitTermination(awaitTerminationTimeout, TimeUnit.MILLISECONDS);

}
} catch (InterruptedException e) {
throw new RuntimeException("Caught exception while shutting down channel", e);
} finally {
channel = null; // clean channel
dynamicClient = null; // clean client
}
}

Expand Down

0 comments on commit 5214361

Please sign in to comment.