Skip to content
This repository has been archived by the owner on Apr 4, 2024. It is now read-only.

Commit

Permalink
fix: ClientCaller 增加全局ServiceResolver缓存
Browse files Browse the repository at this point in the history
fix: ClientCaller
在请求出现异常时,会关闭channel,导致后续请求全部失败的bug
fix: 优化几处异常日志输出

fix: breaking gRPC plugin with io.grpc version 1.49.0 (#187)

* feat: update pom.xml to support JMeter version 5.5
* fix: revert gRPC version to 1.38.0

Co-authored-by: minhhn3 <minhhn3@vng.com.vn>
Signed-off-by: zhiyizhao <zhiyizhao@lightinthebox.com>
  • Loading branch information
2 people authored and zhiyizhao committed Oct 24, 2022
1 parent e8bce58 commit a8e95ce
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 27 deletions.
17 changes: 14 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<lombok.version>1.18.24</lombok.version>
<jmeter.version>5.4.3</jmeter.version>
<commons.lang3.version>3.12.0</commons.lang3.version>
<org.apache.commons.io.version>2.11.0</org.apache.commons.io.version>
<jmeter.version>5.5</jmeter.version>
<netty.ssl.version>2.0.54.Final</netty.ssl.version>
<minhhoang.protoc.version>3.21.4</minhhoang.protoc.version>
<grpc.version>1.49.0</grpc.version>
<grpc.version>1.38.0</grpc.version>
<protobuf.version>3.17.1</protobuf.version>
<jmeter.cmn.version>0.7</jmeter.cmn.version>
<emulators.jmeter.version>0.4</emulators.jmeter.version>
Expand Down Expand Up @@ -58,7 +60,6 @@
<groupId>kg.apc</groupId>
<artifactId>jmeter-plugins-cmn-jmeter</artifactId>
<version>${jmeter.cmn.version}</version>
<scope>provided</scope>
</dependency>

<!-- proto -->
Expand Down Expand Up @@ -104,6 +105,16 @@
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>${netty.ssl.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons.lang3.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${org.apache.commons.io.version}</version>
</dependency>

<!-- json -->
<dependency>
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/vn/zalopay/benchmark/GRPCSampler.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private boolean initGrpcRequestSampler(SampleResult sampleResult) {
try {
initGrpcInCurrentThread(sampleResult);
} catch (Exception e) {
log.error(ExceptionUtils.getPrintExceptionToStr(e, null), "UTF-8");
log.error("init grpc request sampler error", e);
generateErrorResultInInitGRPCRequest(sampleResult, e);
return false;
}
Expand Down Expand Up @@ -168,6 +168,7 @@ private void generateSuccessResult(GrpcResponse grpcResponse, SampleResult sampl

private void generateErrorResult(GrpcResponse grpcResponse, SampleResult sampleResult) {
Throwable throwable = grpcResponse.getThrowable();
log.error("request grpc server error", throwable);
sampleResult.setSuccessful(false);
sampleResult.setResponseCode(" 500");
boolean isRuntimeException = throwable instanceof StatusRuntimeException;
Expand All @@ -186,6 +187,9 @@ private void generateStatusRuntimeExceptionResponseData(
Status.Code code = status.getCode();
responseMessage += code.value() + " " + code.name();
responseData = status.getDescription();
if (responseData == null) {
responseData = "";
}
sampleResult.setResponseMessage(responseMessage);
sampleResult.setResponseData(responseData, "UTF-8");
}
Expand Down
87 changes: 64 additions & 23 deletions src/main/java/vn/zalopay/benchmark/core/ClientCaller.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,17 @@
import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class ClientCaller {
private Descriptors.MethodDescriptor methodDescriptor;
private JsonFormat.TypeRegistry registry;
private DynamicGrpcClient dynamicClient;
private volatile DynamicGrpcClient dynamicClient;
private ImmutableList<DynamicMessage> requestMessages;
private ManagedChannel channel;
private volatile ManagedChannel channel;
private HostAndPort hostAndPort;
private Map<String, String> metadataMap;
private boolean tls;
Expand All @@ -47,6 +49,31 @@ public class ClientCaller {
private final GrpcRequestConfig requestConfig;
ChannelFactory channelFactory;

private static final ConcurrentHashMap<MDKey, ServiceResolver> cache = new ConcurrentHashMap<>();

private static class MDKey {
private final String testProtoFiles;
private final String libFolder;

public MDKey(String testProtoFiles, String libFolder) {
this.testProtoFiles = testProtoFiles;
this.libFolder = libFolder;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MDKey mdKey = (MDKey) o;
return Objects.equals(testProtoFiles, mdKey.testProtoFiles) && Objects.equals(libFolder, mdKey.libFolder);
}

@Override
public int hashCode() {
return Objects.hash(testProtoFiles, libFolder);
}
}

public ClientCaller(GrpcRequestConfig requestConfig) {
this.requestConfig = requestConfig;
this.init(
Expand Down Expand Up @@ -76,20 +103,22 @@ private void init(
channelFactory = ChannelFactory.create();
ProtoMethodName grpcMethodName = ProtoMethodName.parseFullGrpcMethodName(fullMethod);

// Fetch the appropriate file descriptors for the service.
final DescriptorProtos.FileDescriptorSet fileDescriptorSet;
ServiceResolver serviceResolver = cache.computeIfAbsent(new MDKey(testProtoFiles, libFolder), mdk -> {
// Fetch the appropriate file descriptors for the service.
final DescriptorProtos.FileDescriptorSet fileDescriptorSet;

try {
fileDescriptorSet = ProtocInvoker.forConfig(testProtoFiles, libFolder).invoke();
} catch (Exception e) {
shutdownNettyChannel();
throw new RuntimeException(
"Unable to resolve service by invoking protoc: \n" + e.getMessage(), e);
}
try {
fileDescriptorSet = ProtocInvoker.forConfig(testProtoFiles, libFolder).invoke();
} catch (Exception e) {
shutdownNettyChannel();
throw new RuntimeException(
"Unable to resolve service by invoking protoc: \n" + e.getMessage(), e);
}

// Set up the dynamic client and make the call.
return ServiceResolver.fromFileDescriptorSet(fileDescriptorSet);
});

// Set up the dynamic client and make the call.
ServiceResolver serviceResolver =
ServiceResolver.fromFileDescriptorSet(fileDescriptorSet);
methodDescriptor = serviceResolver.resolveServiceMethod(grpcMethodName);

createDynamicClient();
Expand Down Expand Up @@ -149,15 +178,17 @@ private Map<String, String> buildHashMetadata(String metadata) {
}

public void createDynamicClient() {
channel =
channelFactory.createChannel(
hostAndPort,
tls,
disableTtlVerification,
metadataMap,
requestConfig.getMaxInboundMessageSize(),
requestConfig.getMaxInboundMetadataSize());
dynamicClient = DynamicGrpcClient.create(methodDescriptor, channel);
if (dynamicClient == null) {
channel =
channelFactory.createChannel(
hostAndPort,
tls,
disableTtlVerification,
metadataMap,
requestConfig.getMaxInboundMessageSize(),
requestConfig.getMaxInboundMetadataSize());
dynamicClient = DynamicGrpcClient.create(methodDescriptor, channel);
}
}

public boolean isShutdown() {
Expand Down Expand Up @@ -193,10 +224,13 @@ public GrpcResponse call(String deadlineMs) {
StreamObserver<DynamicMessage> streamObserver =
ComponentObserver.of(Writer.create(grpcResponse, registry));
try {
createDynamicClient();
dynamicClient
.blockingUnaryCall(requestMessages, streamObserver, callOptions(deadline))
.get();
} catch (Exception e) {
grpcResponse.setSuccess(false);
grpcResponse.setThrowable(e);
shutdownNettyChannel();
}

Expand All @@ -209,6 +243,7 @@ public GrpcResponse callServerStreaming(String deadlineMs) {
StreamObserver<DynamicMessage> streamObserver =
ComponentObserver.of(Writer.create(grpcResponse, registry));
try {
createDynamicClient();
dynamicClient
.callServerStreaming(requestMessages, streamObserver, callOptions(deadline))
.get();
Expand All @@ -225,6 +260,7 @@ public GrpcResponse callClientStreaming(String deadlineMs) {
StreamObserver<DynamicMessage> streamObserver =
ComponentObserver.of(Writer.create(output, registry));
try {
createDynamicClient();
dynamicClient
.callClientStreaming(requestMessages, streamObserver, callOptions(deadline))
.get();
Expand All @@ -245,6 +281,7 @@ public GrpcResponse callBidiStreaming(String deadlineMs) {
StreamObserver<DynamicMessage> streamObserver =
ComponentObserver.of(Writer.create(output, registry));
try {
createDynamicClient();
dynamicClient
.callBidiStreaming(requestMessages, streamObserver, callOptions(deadline))
.get();
Expand Down Expand Up @@ -272,9 +309,13 @@ public void shutdownNettyChannel() {
if (channel != null) {
channel.shutdown();
channel.awaitTermination(awaitTerminationTimeout, TimeUnit.MILLISECONDS);

}
} catch (InterruptedException e) {
throw new RuntimeException("Caught exception while shutting down channel", e);
} finally {
channel = null; // 清理channel
dynamicClient = null; // 清理client
}
}

Expand Down

0 comments on commit a8e95ce

Please sign in to comment.