Skip to content

Commit

Permalink
triple pojo mode support stream
Browse files Browse the repository at this point in the history
  • Loading branch information
liujianjun.ljj committed Apr 15, 2024
1 parent df27427 commit 837b208
Show file tree
Hide file tree
Showing 26 changed files with 624 additions and 388 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ protected void decorateRequest(SofaRequest request) {

if (!consumerConfig.isGeneric()) {
// 找到调用类型, generic的时候类型在filter里进行判断
request.setInvokeType(consumerConfig.getMethodInvokeType(request));
request.setInvokeType(consumerConfig.getMethodInvokeType(request.getMethodName()));
}

RpcInvokeContext invokeCtx = RpcInvokeContext.peekContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,10 +657,6 @@ else if (RpcConstants.INVOKER_TYPE_FUTURE.equals(invokeType)) {
// 放入线程上下文
RpcInternalContext.getContext().setFuture(future);
response = buildEmptyResponse(request);
} else if (RpcConstants.INVOKER_TYPE_CLIENT_STREAMING.equals(invokeType)
|| RpcConstants.INVOKER_TYPE_BI_STREAMING.equals(invokeType)
|| RpcConstants.INVOKER_TYPE_SERVER_STREAMING.equals(invokeType)) {
response = transport.syncSend(request, Integer.MAX_VALUE);
} else {
throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, "Unknown invoke type:" + invokeType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,6 @@ public abstract class AbstractInterfaceConfig<T, S extends AbstractInterfaceConf
*/
protected transient volatile Map<String, Object> configValueCache = null;

/**
* 方法调用类型,(方法全名 - 调用类型)
*/
protected transient volatile Map<String, String> methodCallType = null;

/**
* 代理接口类,和T对应,主要针对泛化调用
*/
Expand Down Expand Up @@ -252,21 +247,6 @@ public S setProxyClass(Class proxyClass) {
return castThis();
}

/**
* Cache the call type of interface methods
*/
protected void loadMethodCallType(Class<?> interfaceClass){
Method[] methods = interfaceClass.getDeclaredMethods();
this.methodCallType = new ConcurrentHashMap<>();
for(Method method :methods) {
methodCallType.put(method.getName(),MethodConfig.mapStreamType(method,RpcConstants.INVOKER_TYPE_SYNC));
}
}

public String getMethodCallType(String methodName) {
return methodCallType.get(methodName);
}

/**
* Gets application.
*
Expand Down Expand Up @@ -1035,7 +1015,7 @@ public Object getMethodConfigValue(String methodName, String configKey) {
* @param key the key
* @return the string
*/
protected String buildmkey(String methodName, String key) {
private String buildmkey(String methodName, String key) {
return RpcConstants.HIDE_KEY_PREFIX + methodName + RpcConstants.HIDE_KEY_PREFIX + key;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.alipay.sofa.rpc.common.utils.ExceptionUtils;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.listener.ChannelListener;
import com.alipay.sofa.rpc.listener.ConsumerStateListener;
import com.alipay.sofa.rpc.listener.ProviderInfoListener;
Expand All @@ -39,7 +38,6 @@
import static com.alipay.sofa.rpc.common.RpcConfigs.getBooleanValue;
import static com.alipay.sofa.rpc.common.RpcConfigs.getIntValue;
import static com.alipay.sofa.rpc.common.RpcConfigs.getStringValue;
import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_REJECTED_EXECUTION_POLICY;
import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_ADDRESS_HOLDER;
import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_ADDRESS_WAIT;
import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_CHECK;
Expand All @@ -55,6 +53,7 @@
import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_LAZY;
import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_LOAD_BALANCER;
import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_RECONNECT_PERIOD;
import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_REJECTED_EXECUTION_POLICY;
import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_REPEATED_REFERENCE_LIMIT;
import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_RETRIES;
import static com.alipay.sofa.rpc.common.RpcOptions.CONSUMER_STICKY;
Expand Down Expand Up @@ -936,62 +935,12 @@ public SofaResponseCallback getMethodOnreturn(String methodName) {
/**
* Gets the call type corresponding to the method name
*
* @param sofaRequest the request
* @param methodName the method name
* @return the call type
*/
public String getMethodInvokeType(SofaRequest sofaRequest) {
String methodName = sofaRequest.getMethodName();

String invokeType = (String) getMethodConfigValue(methodName, RpcConstants.CONFIG_KEY_INVOKE_TYPE, null);

if (invokeType == null) {
invokeType = getAndCacheCallType(sofaRequest);
}

return invokeType;
}

/**
* Get and cache the call type of certain method
* @param request RPC request
* @return request call type
*/
public String getAndCacheCallType(SofaRequest request) {
Method method = request.getMethod();
String callType = MethodConfig
.mapStreamType(
method,
(String) getMethodConfigValue(request.getMethodName(), RpcConstants.CONFIG_KEY_INVOKE_TYPE,
getInvokeType())
);
//Method level config
updateAttribute(buildMethodConfigKey(request, RpcConstants.CONFIG_KEY_INVOKE_TYPE), callType, true);
return callType;
}

/**
* 通过请求的目标方法构建方法配置key。该key使用内部配置格式。(以'.' 开头)
* @param request RPC请求
* @return 方法配置名称,带方法参数列表
*/
public String buildMethodConfigKey(SofaRequest request, String propertyKey) {
return "." + getMethodSignature(request.getMethod()) + "." + propertyKey;
}

public static String getMethodSignature(Method method) {
Class<?>[] parameterTypes = method.getParameterTypes();
StringBuilder methodSignature = new StringBuilder();
methodSignature.append(method.getName()).append("(");

for (int i = 0; i < parameterTypes.length; i++) {
methodSignature.append(parameterTypes[i].getSimpleName());
if (i < parameterTypes.length - 1) {
methodSignature.append(", ");
}
}

methodSignature.append(")");
return methodSignature.toString();
public String getMethodInvokeType(String methodName) {
return (String) getMethodConfigValue(methodName, RpcConstants.CONFIG_KEY_INVOKE_TYPE,
getInvokeType());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,10 @@
*/
package com.alipay.sofa.rpc.config;

import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.core.exception.RpcErrorType;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback;
import com.alipay.sofa.rpc.transport.StreamHandler;

import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -334,37 +329,4 @@ public String getParameter(String key) {
return parameters == null ? null : parameters.get(key);
}

/**
* Gets the stream call type of certain method
* @param method the method
* @return call type,server/client/bidirectional stream or default value. If not mapped to any stream call type, use the default value
*/
public static String mapStreamType(Method method, String defaultValue){
Class<?>[] paramClasses = method.getParameterTypes();
Class<?> returnClass = method.getReturnType();

int paramLen = paramClasses.length;
String callType;

//BidirectionalStream & ClientStream
if(paramLen > 0 && StreamHandler.class.isAssignableFrom(paramClasses[0]) && StreamHandler.class.isAssignableFrom(returnClass)){
if(paramLen > 1){
throw new SofaRpcException(RpcErrorType.CLIENT_CALL_TYPE,"Bidirectional/Client stream method parameters can be only one StreamHandler.");
}
callType = RpcConstants.INVOKER_TYPE_BI_STREAMING;
}
//ServerStream
else if (paramLen > 1 && StreamHandler.class.isAssignableFrom(paramClasses[0]) && void.class == returnClass){
callType = RpcConstants.INVOKER_TYPE_SERVER_STREAMING;
}
else if (StreamHandler.class.isAssignableFrom(returnClass) || Arrays.stream(paramClasses).anyMatch(StreamHandler.class::isAssignableFrom)) {
throw new SofaRpcException(RpcErrorType.CLIENT_CALL_TYPE, "StreamHandler can only at the specified location of parameter. Please check related docs.");
}
//Other call types
else {
callType = defaultValue;
}

return callType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ public T getRef() {
*/
public ProviderConfig<T> setRef(T ref) {
this.ref = ref;
loadMethodCallType(ref.getClass());
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws So

// 修正类型
ConsumerConfig consumerConfig = (ConsumerConfig) invoker.getConfig();
String invokeType = consumerConfig.getMethodInvokeType(request);
String invokeType = consumerConfig.getMethodInvokeType(request.getMethodName());
request.setInvokeType(invokeType);
request.addRequestProp(RemotingConstants.HEAD_INVOKE_TYPE, invokeType);
request.addRequestProp(REVISE_KEY, REVISE_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,18 @@ public ClientStreamObserverAdapter(StreamHandler<Object> streamHandler, byte ser

@Override
public void onNext(triple.Response response) {
byte[] responseDate = response.getData().toByteArray();
byte[] responseData = response.getData().toByteArray();
Object appResponse = null;
String returnTypeName = response.getType();
if (responseDate != null && responseDate.length > 0) {
if (responseData != null && responseData.length > 0) {
if (returnType == null && !returnTypeName.isEmpty()) {
try {
returnType = Class.forName(returnTypeName);
} catch (ClassNotFoundException e) {
throw new SofaRpcException(RpcErrorType.CLIENT_SERIALIZE, "Can not find return type :" + returnType);
}
}
appResponse = serializer.decode(new ByteArrayWrapperByteBuf(responseDate), returnType, null);
appResponse = serializer.decode(new ByteArrayWrapperByteBuf(responseData), returnType, null);
}

streamHandler.onMessage(appResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alipay.sofa.rpc.codec.Serializer;
import com.alipay.sofa.rpc.codec.SerializerFactory;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.transport.StreamHandler;
import com.alipay.sofa.rpc.utils.TripleExceptionUtils;
import com.google.protobuf.ByteString;
Expand All @@ -37,8 +38,10 @@ public class ResponseSerializeStreamHandler<T> implements StreamHandler<T> {

public ResponseSerializeStreamHandler(StreamObserver<triple.Response> streamObserver, String serializeType) {
this.streamObserver = streamObserver;
serializer = SerializerFactory.getSerializer(serializeType);
this.serializeType = serializeType;
if (StringUtils.isNotBlank(serializeType)) {
this.serializer = SerializerFactory.getSerializer(serializeType);
this.serializeType = serializeType;
}
}

@Override
Expand All @@ -61,4 +64,9 @@ public void onException(Throwable throwable) {
streamObserver.onError(TripleExceptionUtils.asStatusRuntimeException(throwable));
}

public void setSerializeType(String serializeType) {
this.serializer = SerializerFactory.getSerializer(serializeType);
this.serializeType = serializeType;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import com.alipay.sofa.rpc.common.cache.ReflectCache;
import com.alipay.sofa.rpc.common.utils.ClassTypeUtils;
import com.alipay.sofa.rpc.common.utils.ClassUtils;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.core.exception.RpcErrorType;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.exception.SofaRpcRuntimeException;
Expand All @@ -46,8 +44,6 @@
import java.lang.reflect.Method;
import java.util.List;

import static com.alipay.sofa.rpc.common.RpcOptions.DEFAULT_SERIALIZATION;

/**
* @author zhaowang
* @version : GenericServiceImpl.java, v 0.1 2020年05月27日 9:19 下午 zhaowang Exp $
Expand All @@ -58,12 +54,9 @@ public class GenericServiceImpl extends SofaGenericServiceTriple.GenericServiceI

protected UniqueIdInvoker invoker;

private ProviderConfig<?> providerConfig;

public GenericServiceImpl(UniqueIdInvoker invoker, ProviderConfig<?> serverConfig) {
public GenericServiceImpl(UniqueIdInvoker invoker) {
super();
this.invoker = invoker;
this.providerConfig = serverConfig;
}

@Override
Expand Down Expand Up @@ -114,7 +107,7 @@ public StreamObserver<Request> genericBiStream(StreamObserver<Response> response
String methodName = serviceMethod.getName();
try {
ResponseSerializeStreamHandler serverResponseHandler = new ResponseSerializeStreamHandler(responseObserver,
getSerialization());
null);

setBidirectionalStreamRequestParams(sofaRequest, serviceMethod, serverResponseHandler);

Expand All @@ -123,37 +116,41 @@ public StreamObserver<Request> genericBiStream(StreamObserver<Response> response
StreamHandler<Object> clientHandler = (StreamHandler<Object>) sofaResponse.getAppResponse();

return new StreamObserver<Request>() {
volatile Serializer serializer = null;
private volatile Serializer serializer = null;

private volatile String serializeType = null;

volatile Class<?>[] argTypes = null;
private volatile Class<?>[] argTypes = null;

@Override
public void onNext(Request request) {
checkInitialize(request);
Object message = getInvokeArgs(request, argTypes, serializer, false)[0];
serverResponseHandler.setSerializeType(serializeType);
clientHandler.onMessage(message);
}

@Override
public void onError(Throwable t) {
clientHandler.onException(t);
}

@Override
public void onCompleted() {
clientHandler.onFinish();
}

private void checkInitialize(Request request) {
if (serializer == null && argTypes == null) {
synchronized (this) {
if (serializer == null && argTypes == null) {
serializeType = request.getSerializeType();
serializer = SerializerFactory.getSerializer(request.getSerializeType());
argTypes = getArgTypes(request, false);
}
}
}
}

@Override
public void onError(Throwable t) {
clientHandler.onException(t);
}

@Override
public void onCompleted() {
clientHandler.onFinish();
}
};
} catch (Exception e) {
LOGGER.error("Invoke " + methodName + " error:", e);
Expand All @@ -178,7 +175,7 @@ public void genericServerStream(Request request, StreamObserver<Response> respon
Serializer serializer = SerializerFactory.getSerializer(request.getSerializeType());

setUnaryOrServerRequestParams(sofaRequest, request, methodName, serializer, serviceMethod, true);
sofaRequest.getMethodArgs()[0] = new ResponseSerializeStreamHandler<>(responseObserver, getSerialization());
sofaRequest.getMethodArgs()[0] = new ResponseSerializeStreamHandler<>(responseObserver, request.getSerializeType());

invoker.invoke(sofaRequest);
} catch (Exception e) {
Expand Down Expand Up @@ -321,16 +318,4 @@ private Object[] getInvokeArgs(Request request, Class[] argTypes, Serializer ser
}
return args;
}

private String getSerialization() {
String serialization = providerConfig.getSerialization();
if (StringUtils.isBlank(serialization)) {
serialization = getDefaultSerialization();
}
return serialization;
}

private String getDefaultSerialization() {
return DEFAULT_SERIALIZATION;
}
}
Loading

0 comments on commit 837b208

Please sign in to comment.