Skip to content

Commit

Permalink
add providerProcessRegister event and record context to bolt
Browse files Browse the repository at this point in the history
  • Loading branch information
liujianjun.ljj committed May 7, 2024
1 parent 266006d commit 437ff33
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,6 @@ public static SofaRequest buildSofaRequest(Class<?> clazz, Method method, Class[
return request;
}

/**
* 根据一个请求的属性复制一个不包含具体方法实参的请求。
* 复制以下属性:请求接口名、请求方法名、请求方法、方法参数类型
*
* @param sofaRequest 被复制的请求实例
*/
public static SofaRequest copyEmptyRequest(SofaRequest sofaRequest) {
SofaRequest request = new SofaRequest();
request.setInterfaceName(sofaRequest.getInterfaceName());
request.setMethodName(sofaRequest.getMethodName());
request.setMethod(sofaRequest.getMethod());
request.setMethodArgSigs(sofaRequest.getMethodArgSigs());
return request;
}

/**
* 构建rpc错误结果
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void generic(Request request, StreamObserver<Response> responseObserver)
ClassLoader serviceClassLoader = invoker.getServiceClassLoader(sofaRequest);
Thread.currentThread().setContextClassLoader(serviceClassLoader);
Serializer serializer = SerializerFactory.getSerializer(request.getSerializeType());
setUnaryOrServerRequestParams(sofaRequest, request, methodName, serializer, declaredMethod, false);
setUnaryOrServerRequestParams(sofaRequest, request, serializer, declaredMethod, false);

SofaResponse response = invoker.invoke(sofaRequest);
Object ret = getAppResponse(declaredMethod, response);
Expand Down Expand Up @@ -183,8 +183,8 @@ public void genericServerStream(Request request, StreamObserver<Response> respon
Thread.currentThread().setContextClassLoader(serviceClassLoader);
Serializer serializer = SerializerFactory.getSerializer(request.getSerializeType());

setUnaryOrServerRequestParams(sofaRequest, request, methodName, serializer, serviceMethod, true);
sofaRequest.getMethodArgs()[0] = new ResponseSerializeStreamHandler<>(responseObserver, request.getSerializeType());
setUnaryOrServerRequestParams(sofaRequest, request, serializer, serviceMethod, true);
sofaRequest.getMethodArgs()[sofaRequest.getMethodArgs().length -1] = new ResponseSerializeStreamHandler<>(responseObserver, request.getSerializeType());

invoker.invoke(sofaRequest);
} catch (Exception e) {
Expand All @@ -200,16 +200,11 @@ public void genericServerStream(Request request, StreamObserver<Response> respon
*
* @param sofaRequest SofaRequest
* @param request Request
* @param methodName MethodName
* @param serializer Serializer
* @param declaredMethod Target invoke method
*/
private void setUnaryOrServerRequestParams(SofaRequest sofaRequest, Request request, String methodName,
private void setUnaryOrServerRequestParams(SofaRequest sofaRequest, Request request,
Serializer serializer, Method declaredMethod, boolean isServerStreamCall) {
if (declaredMethod == null) {
throw new SofaRpcException(RpcErrorType.SERVER_NOT_FOUND_INVOKER, "Cannot find invoke method " +
methodName);
}
Class[] argTypes = getArgTypes(request, isServerStreamCall);
Object[] invokeArgs = getInvokeArgs(request, argTypes, serializer, isServerStreamCall);

Expand Down Expand Up @@ -267,18 +262,14 @@ private Class[] getArgTypes(Request request, boolean addStreamHandler) {
int size = addStreamHandler ? argTypesList.size() + 1 : argTypesList.size();
Class[] argTypes = new Class[size];

if (addStreamHandler) {
argTypes[0] = StreamHandler.class;
}
for (int i = addStreamHandler ? 1 : 0; i < size; i++) {
String typeName;
if (addStreamHandler) {
typeName = argTypesList.get(i - 1);
} else {
typeName = argTypesList.get(i);
}
for (int i = 0; i < argTypesList.size(); i++) {
String typeName = argTypesList.get(i);
argTypes[i] = ClassTypeUtils.getClass(typeName);
}

if (addStreamHandler) {
argTypes[size - 1] = StreamHandler.class;
}
return argTypes;
}

Expand All @@ -291,16 +282,10 @@ private Class[] getArgTypes(Request request, boolean addStreamHandler) {
private Object[] getInvokeArgs(Request request, Class[] argTypes, Serializer serializer, boolean addStreamHandler) {
List<ByteString> argsList = request.getArgsList();
int size = addStreamHandler ? argsList.size() + 1 : argsList.size();
int start = addStreamHandler ? 1 : 0;
Object[] args = new Object[size];

for (int i = start; i < size; i++) {
byte[] data;
if (addStreamHandler) {
data = argsList.get(i - 1).toByteArray();
} else {
data = argsList.get(i).toByteArray();
}
Object[] args = new Object[size];
for (int i = 0; i < argsList.size(); i++) {
byte[] data = argsList.get(i).toByteArray();
args[i] = serializer.decode(new ByteArrayWrapperByteBuf(data), argTypes[i],
null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ private ServiceDescriptor getServiceDescriptor(ServerServiceDefinition template,
private List<MethodDescriptor<Request, Response>> getMethodDescriptor(ProviderConfig providerConfig) {
List<MethodDescriptor<Request, Response>> result = new ArrayList<>();
Set<String> methodNames = SofaProtoUtils.getMethodNames(providerConfig.getInterfaceId());
Map<String, String> streamCallTypeMap = SofaProtoUtils.cacheStreamCallType(providerConfig);
Map<String, String> streamCallTypeMap = SofaProtoUtils.cacheStreamCallType(providerConfig.getProxyClass());
for (String name : methodNames) {
MethodDescriptor.MethodType methodType = SofaProtoUtils.mapGrpcCallType(streamCallTypeMap.get(name));
MethodDescriptor<Request, Response> methodDescriptor = MethodDescriptor.<Request, Response>newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ public Method getDeclaredMethod(SofaRequest sofaRequest, Request request, String
List<String> argTypesList = request.getArgTypesList();
if(RpcConstants.INVOKER_TYPE_SERVER_STREAMING.equals(callType)){
List<String> a = new ArrayList<>(argTypesList.size()+1);
a.add(0, StreamHandler.class.getCanonicalName());
a.addAll(argTypesList);
a.add(StreamHandler.class.getCanonicalName());
argTypesList = a;
}
return ReflectCache.getOverloadMethodCache(uniqueName, sofaRequest.getMethodName(), argTypesList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ public class TripleClientInvoker implements TripleInvoker {
private Serializer serializer;
private String serialization;

private Map<String, Method> methodMap = new ConcurrentHashMap<>();
private final Map<String, Method> methodMap = new ConcurrentHashMap<>();

/**
* 方法调用类型,(方法全名 - 流式调用类型)
* Method call type (Full method name - Streaming call type)
* Since gRPC does not support method overload, each method here will only represent a single call type.
*/
private Map<String, String> methodCallType = null;

Expand All @@ -111,27 +112,10 @@ public TripleClientInvoker(ConsumerConfig consumerConfig, ProviderInfo providerI
LOGGER.error("getSofaStub not found in enclosingClass" + enclosingClass.getName());
}
} else {
methodCallType = SofaProtoUtils.cacheStreamCallType(consumerConfig);
methodCallType = SofaProtoUtils.cacheStreamCallType(consumerConfig.getProxyClass());
}
}

public static Request getRequest(SofaRequest sofaRequest, String serialization, Serializer serializer, int trueParamStart) {
Request.Builder builder = Request.newBuilder();
builder.setSerializeType(serialization);

String[] methodArgSigs = sofaRequest.getMethodArgSigs();
Object[] methodArgs = sofaRequest.getMethodArgs();

for (int i = trueParamStart; i < methodArgSigs.length; i++) {
Object arg = methodArgs[i];
ByteString argByteString = ByteString.copyFrom(serializer.encode(arg, null).array());
builder.addArgs(argByteString);
builder.addArgTypes(methodArgSigs[i]);
}
return builder.build();
}


private void cacheCommonData(ConsumerConfig consumerConfig) {
String serialization = consumerConfig.getSerialization();
if (StringUtils.isBlank(serialization)) {
Expand All @@ -148,19 +132,29 @@ protected String getDefaultSerialization() {
@Override
public SofaResponse invoke(SofaRequest sofaRequest, int timeout)
throws Exception {
if(!useGeneric){
return stubCall(sofaRequest,timeout);
if (!useGeneric) {
return stubCall(sofaRequest, timeout);
}
MethodDescriptor.MethodType callType = mapCallType(sofaRequest.getMethod());
if (callType.equals(MethodDescriptor.MethodType.UNARY)) {
return unaryCall(sofaRequest, timeout);
} else {
return streamCall(sofaRequest, timeout, callType);
switch (callType) {
case UNARY:
return genericUnaryCall(sofaRequest, timeout);
case BIDI_STREAMING:
return genericBinaryStreamCall(sofaRequest, timeout);
case CLIENT_STREAMING:
return genericClientStreamCall(sofaRequest, timeout);
case SERVER_STREAMING:
return genericServerStreamCall(sofaRequest, timeout);
default:
throw new SofaRpcException(RpcErrorType.CLIENT_CALL_TYPE, "Unknown stream call type:" + callType);
}
}

private MethodDescriptor.MethodType mapCallType(Method method) {
String streamType = methodCallType.get(method.getName());
if(StringUtils.isBlank(streamType)) {
return MethodDescriptor.MethodType.UNARY;
}
switch (streamType) {
case RpcConstants.INVOKER_TYPE_BI_STREAMING:
return MethodDescriptor.MethodType.BIDI_STREAMING;
Expand All @@ -173,23 +167,19 @@ private MethodDescriptor.MethodType mapCallType(Method method) {
}
}

private SofaResponse streamCall(SofaRequest sofaRequest, int timeout, MethodDescriptor.MethodType callType) {
switch (callType) {
case BIDI_STREAMING:
return binaryStreamCall(sofaRequest, timeout);
case CLIENT_STREAMING:
return clientStreamCall(sofaRequest, timeout);
case SERVER_STREAMING:
return serverStreamCall(sofaRequest, timeout);
default:
throw new SofaRpcException(RpcErrorType.CLIENT_CALL_TYPE, "Unknown stream call type:" + callType);
}
private SofaResponse stubCall(SofaRequest sofaRequest, int timeout) throws Exception{
SofaResponse sofaResponse = new SofaResponse();
Object stub = sofaStub.invoke(null, channel, buildCustomCallOptions(sofaRequest, timeout),
timeout);
final Method method = sofaRequest.getMethod();
Object appResponse = method.invoke(stub, sofaRequest.getMethodArgs()[0]);
sofaResponse.setAppResponse(appResponse);
return sofaResponse;
}


private SofaResponse unaryCall(SofaRequest sofaRequest, int timeout) throws Exception{
private SofaResponse genericUnaryCall(SofaRequest sofaRequest, int timeout) {
MethodDescriptor methodDescriptor = getMethodDescriptor(sofaRequest);
Request request = getRequest(sofaRequest, serialization, serializer, 0);
Request request = buildRequest(sofaRequest, serialization, serializer, 0);
Response response = (Response) ClientCalls.blockingUnaryCall(channel, methodDescriptor,
buildCustomCallOptions(sofaRequest, timeout), request);

Expand All @@ -206,17 +196,7 @@ private SofaResponse unaryCall(SofaRequest sofaRequest, int timeout) throws Exce
return sofaResponse;
}

private SofaResponse stubCall(SofaRequest sofaRequest, int timeout) throws Exception{
SofaResponse sofaResponse = new SofaResponse();
Object stub = sofaStub.invoke(null, channel, buildCustomCallOptions(sofaRequest, timeout),
timeout);
final Method method = sofaRequest.getMethod();
Object appResponse = method.invoke(stub, sofaRequest.getMethodArgs()[0]);
sofaResponse.setAppResponse(appResponse);
return sofaResponse;
}

private SofaResponse binaryStreamCall(SofaRequest sofaRequest, int timeout) {
private SofaResponse genericBinaryStreamCall(SofaRequest sofaRequest, int timeout) {
StreamHandler streamHandler = (StreamHandler) sofaRequest.getMethodArgs()[0];

MethodDescriptor<Request, Response> methodDescriptor = getMethodDescriptor(sofaRequest);
Expand All @@ -232,11 +212,14 @@ private SofaResponse binaryStreamCall(SofaRequest sofaRequest, int timeout) {
StreamHandler<Request> handler = new StreamHandler() {
@Override
public void onMessage(Object message) {
SofaRequest request = MessageBuilder.copyEmptyRequest(sofaRequest);
Object[] args = new Object[]{message};
SofaRequest request = new SofaRequest();
request.setInterfaceName(sofaRequest.getInterfaceName());
request.setMethodName(sofaRequest.getMethodName());
request.setMethod(sofaRequest.getMethod());
request.setMethodArgs(args);
request.setMethodArgSigs(rebuildTrueRequestArgSigs(args));
Request req = getRequest(request, serialization, serializer, 0);
Request req = buildRequest(request, serialization, serializer, 0);
observer.onNext(req);
}

Expand All @@ -255,17 +238,17 @@ public void onException(Throwable throwable) {
return sofaResponse;
}

private SofaResponse clientStreamCall(SofaRequest sofaRequest, int timeout) {
return binaryStreamCall(sofaRequest, timeout);
private SofaResponse genericClientStreamCall(SofaRequest sofaRequest, int timeout) {
return genericBinaryStreamCall(sofaRequest, timeout);
}

private SofaResponse serverStreamCall(SofaRequest sofaRequest, int timeout) {
StreamHandler streamHandler = (StreamHandler) sofaRequest.getMethodArgs()[0];
private SofaResponse genericServerStreamCall(SofaRequest sofaRequest, int timeout) {
StreamHandler streamHandler = (StreamHandler) sofaRequest.getMethodArgs()[sofaRequest.getMethodArgs().length -1];

MethodDescriptor<Request, Response> methodDescriptor = getMethodDescriptor(sofaRequest);
ClientCall<Request, Response> call = channel.newCall(methodDescriptor, buildCustomCallOptions(sofaRequest, timeout));

Request req = getRequest(sofaRequest, serialization, serializer, 1);
Request req = buildRequest(sofaRequest, serialization, serializer, 1);

ClientStreamObserverAdapter responseObserver = new ClientStreamObserverAdapter(streamHandler, sofaRequest.getSerializeType());

Expand All @@ -274,6 +257,24 @@ private SofaResponse serverStreamCall(SofaRequest sofaRequest, int timeout) {
return new SofaResponse();
}

/**
* Build arg sigs for stream calls.
*
* @param requestArgs request args
* @return arg sigs, arg.getClass().getName().
*/
private String[] rebuildTrueRequestArgSigs(Object[] requestArgs) {
String[] classes = new String[requestArgs.length];
for (int k = 0; k < requestArgs.length; k++) {
if (requestArgs[k] != null) {
classes[k] = requestArgs[k].getClass().getName();
} else {
classes[k] = void.class.getName();
}
}
return classes;
}

@Override
public ResponseFuture asyncInvoke(SofaRequest sofaRequest, int timeout) throws Exception {
SofaResponseCallback sofaResponseCallback = sofaRequest.getSofaResponseCallback();
Expand Down Expand Up @@ -320,7 +321,7 @@ public void onCompleted() {
});
} else {
MethodDescriptor methodDescriptor = getMethodDescriptor(sofaRequest);
Request request = getRequest(sofaRequest, serialization, serializer, 0);
Request request = buildRequest(sofaRequest, serialization, serializer, 0);
ClientCalls.asyncUnaryCall(channel.newCall(methodDescriptor, buildCustomCallOptions(sofaRequest, timeout)), request, new StreamObserver<Object>() {
@Override
public void onNext(Object o) {
Expand All @@ -341,22 +342,20 @@ public void onCompleted() {
return future;
}

/**
* Build arg sigs for stream calls.
*
* @param requestArgs request args
* @return arg sigs, arg.getClass().getName().
*/
private String[] rebuildTrueRequestArgSigs(Object[] requestArgs) {
String[] classes = new String[requestArgs.length];
for (int k = 0; k < requestArgs.length; k++) {
if (requestArgs[k] != null) {
classes[k] = requestArgs[k].getClass().getName();
} else {
classes[k] = void.class.getName();
}
public static Request buildRequest(SofaRequest sofaRequest, String serialization, Serializer serializer, int backOffset) {
Request.Builder builder = Request.newBuilder();
builder.setSerializeType(serialization);

String[] methodArgSigs = sofaRequest.getMethodArgSigs();
Object[] methodArgs = sofaRequest.getMethodArgs();

for (int i = 0; i < methodArgSigs.length - backOffset; i++) {
Object arg = methodArgs[i];
ByteString argByteString = ByteString.copyFrom(serializer.encode(arg, null).array());
builder.addArgs(argByteString);
builder.addArgTypes(methodArgSigs[i]);
}
return classes;
return builder.build();
}

private void processSuccess(boolean needDecode, RpcInternalContext context, SofaRequest sofaRequest, Object o, SofaResponseCallback sofaResponseCallback, TripleResponseFuture future, ClassLoader classLoader) {
Expand Down
Loading

0 comments on commit 437ff33

Please sign in to comment.