From 437ff33cfa8795f89b06916c4aeb307373d27270 Mon Sep 17 00:00:00 2001 From: "liujianjun.ljj" Date: Tue, 7 May 2024 20:13:57 +0800 Subject: [PATCH] add providerProcessRegister event and record context to bolt --- .../sofa/rpc/message/MessageBuilder.java | 15 -- .../rpc/server/triple/GenericServiceImpl.java | 41 ++--- .../sofa/rpc/server/triple/TripleServer.java | 2 +- .../rpc/server/triple/UniqueIdInvoker.java | 2 +- .../transport/triple/TripleClientInvoker.java | 147 +++++++++--------- .../alipay/sofa/rpc/utils/SofaProtoUtils.java | 35 +---- .../server/triple/GenericServiceImplTest.java | 2 +- .../rpc/test/triple/stream/ClientRequest.java | 8 +- .../triple/stream/ExtendClientRequest.java | 4 +- .../rpc/test/triple/stream/HelloService.java | 4 +- .../test/triple/stream/HelloServiceImpl.java | 6 +- .../stream/TripleGenericStreamTest.java | 14 +- 12 files changed, 108 insertions(+), 172 deletions(-) diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/message/MessageBuilder.java b/core/api/src/main/java/com/alipay/sofa/rpc/message/MessageBuilder.java index c143d5742..6607208fc 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/message/MessageBuilder.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/message/MessageBuilder.java @@ -69,21 +69,6 @@ public static SofaRequest buildSofaRequest(Class clazz, Method method, Class[ return request; } - /** - * 根据一个请求的属性复制一个不包含具体方法实参的请求。 - * 复制以下属性:请求接口名、请求方法名、请求方法、方法参数类型 - * - * @param sofaRequest 被复制的请求实例 - */ - public static SofaRequest copyEmptyRequest(SofaRequest sofaRequest) { - SofaRequest request = new SofaRequest(); - request.setInterfaceName(sofaRequest.getInterfaceName()); - request.setMethodName(sofaRequest.getMethodName()); - request.setMethod(sofaRequest.getMethod()); - request.setMethodArgSigs(sofaRequest.getMethodArgSigs()); - return request; - } - /** * 构建rpc错误结果 * diff --git a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/GenericServiceImpl.java b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/GenericServiceImpl.java index e74461203..c28fd4044 100644 --- a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/GenericServiceImpl.java +++ b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/GenericServiceImpl.java @@ -75,7 +75,7 @@ public void generic(Request request, StreamObserver responseObserver) ClassLoader serviceClassLoader = invoker.getServiceClassLoader(sofaRequest); Thread.currentThread().setContextClassLoader(serviceClassLoader); Serializer serializer = SerializerFactory.getSerializer(request.getSerializeType()); - setUnaryOrServerRequestParams(sofaRequest, request, methodName, serializer, declaredMethod, false); + setUnaryOrServerRequestParams(sofaRequest, request, serializer, declaredMethod, false); SofaResponse response = invoker.invoke(sofaRequest); Object ret = getAppResponse(declaredMethod, response); @@ -183,8 +183,8 @@ public void genericServerStream(Request request, StreamObserver respon Thread.currentThread().setContextClassLoader(serviceClassLoader); Serializer serializer = SerializerFactory.getSerializer(request.getSerializeType()); - setUnaryOrServerRequestParams(sofaRequest, request, methodName, serializer, serviceMethod, true); - sofaRequest.getMethodArgs()[0] = new ResponseSerializeStreamHandler<>(responseObserver, request.getSerializeType()); + setUnaryOrServerRequestParams(sofaRequest, request, serializer, serviceMethod, true); + sofaRequest.getMethodArgs()[sofaRequest.getMethodArgs().length -1] = new ResponseSerializeStreamHandler<>(responseObserver, request.getSerializeType()); invoker.invoke(sofaRequest); } catch (Exception e) { @@ -200,16 +200,11 @@ public void genericServerStream(Request request, StreamObserver respon * * @param sofaRequest SofaRequest * @param request Request - * @param methodName MethodName * @param serializer Serializer * @param declaredMethod Target invoke method */ - private void setUnaryOrServerRequestParams(SofaRequest sofaRequest, Request request, String methodName, + private void setUnaryOrServerRequestParams(SofaRequest sofaRequest, Request request, Serializer serializer, Method declaredMethod, boolean isServerStreamCall) { - if (declaredMethod == null) { - throw new SofaRpcException(RpcErrorType.SERVER_NOT_FOUND_INVOKER, "Cannot find invoke method " + - methodName); - } Class[] argTypes = getArgTypes(request, isServerStreamCall); Object[] invokeArgs = getInvokeArgs(request, argTypes, serializer, isServerStreamCall); @@ -267,18 +262,14 @@ private Class[] getArgTypes(Request request, boolean addStreamHandler) { int size = addStreamHandler ? argTypesList.size() + 1 : argTypesList.size(); Class[] argTypes = new Class[size]; - if (addStreamHandler) { - argTypes[0] = StreamHandler.class; - } - for (int i = addStreamHandler ? 1 : 0; i < size; i++) { - String typeName; - if (addStreamHandler) { - typeName = argTypesList.get(i - 1); - } else { - typeName = argTypesList.get(i); - } + for (int i = 0; i < argTypesList.size(); i++) { + String typeName = argTypesList.get(i); argTypes[i] = ClassTypeUtils.getClass(typeName); } + + if (addStreamHandler) { + argTypes[size - 1] = StreamHandler.class; + } return argTypes; } @@ -291,16 +282,10 @@ private Class[] getArgTypes(Request request, boolean addStreamHandler) { private Object[] getInvokeArgs(Request request, Class[] argTypes, Serializer serializer, boolean addStreamHandler) { List argsList = request.getArgsList(); int size = addStreamHandler ? argsList.size() + 1 : argsList.size(); - int start = addStreamHandler ? 1 : 0; - Object[] args = new Object[size]; - for (int i = start; i < size; i++) { - byte[] data; - if (addStreamHandler) { - data = argsList.get(i - 1).toByteArray(); - } else { - data = argsList.get(i).toByteArray(); - } + Object[] args = new Object[size]; + for (int i = 0; i < argsList.size(); i++) { + byte[] data = argsList.get(i).toByteArray(); args[i] = serializer.decode(new ByteArrayWrapperByteBuf(data), argTypes[i], null); } diff --git a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/TripleServer.java b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/TripleServer.java index 3825c01fa..47f225352 100644 --- a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/TripleServer.java +++ b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/TripleServer.java @@ -376,7 +376,7 @@ private ServiceDescriptor getServiceDescriptor(ServerServiceDefinition template, private List> getMethodDescriptor(ProviderConfig providerConfig) { List> result = new ArrayList<>(); Set methodNames = SofaProtoUtils.getMethodNames(providerConfig.getInterfaceId()); - Map streamCallTypeMap = SofaProtoUtils.cacheStreamCallType(providerConfig); + Map streamCallTypeMap = SofaProtoUtils.cacheStreamCallType(providerConfig.getProxyClass()); for (String name : methodNames) { MethodDescriptor.MethodType methodType = SofaProtoUtils.mapGrpcCallType(streamCallTypeMap.get(name)); MethodDescriptor methodDescriptor = MethodDescriptor.newBuilder() diff --git a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/UniqueIdInvoker.java b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/UniqueIdInvoker.java index 5cbf27db8..a2279ab61 100644 --- a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/UniqueIdInvoker.java +++ b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/UniqueIdInvoker.java @@ -183,8 +183,8 @@ public Method getDeclaredMethod(SofaRequest sofaRequest, Request request, String List argTypesList = request.getArgTypesList(); if(RpcConstants.INVOKER_TYPE_SERVER_STREAMING.equals(callType)){ List a = new ArrayList<>(argTypesList.size()+1); - a.add(0, StreamHandler.class.getCanonicalName()); a.addAll(argTypesList); + a.add(StreamHandler.class.getCanonicalName()); argTypesList = a; } return ReflectCache.getOverloadMethodCache(uniqueName, sofaRequest.getMethodName(), argTypesList diff --git a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/transport/triple/TripleClientInvoker.java b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/transport/triple/TripleClientInvoker.java index 142293883..a77f7d248 100644 --- a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/transport/triple/TripleClientInvoker.java +++ b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/transport/triple/TripleClientInvoker.java @@ -88,10 +88,11 @@ public class TripleClientInvoker implements TripleInvoker { private Serializer serializer; private String serialization; - private Map methodMap = new ConcurrentHashMap<>(); + private final Map methodMap = new ConcurrentHashMap<>(); /** - * 方法调用类型,(方法全名 - 流式调用类型) + * Method call type (Full method name - Streaming call type) + * Since gRPC does not support method overload, each method here will only represent a single call type. */ private Map methodCallType = null; @@ -111,27 +112,10 @@ public TripleClientInvoker(ConsumerConfig consumerConfig, ProviderInfo providerI LOGGER.error("getSofaStub not found in enclosingClass" + enclosingClass.getName()); } } else { - methodCallType = SofaProtoUtils.cacheStreamCallType(consumerConfig); + methodCallType = SofaProtoUtils.cacheStreamCallType(consumerConfig.getProxyClass()); } } - public static Request getRequest(SofaRequest sofaRequest, String serialization, Serializer serializer, int trueParamStart) { - Request.Builder builder = Request.newBuilder(); - builder.setSerializeType(serialization); - - String[] methodArgSigs = sofaRequest.getMethodArgSigs(); - Object[] methodArgs = sofaRequest.getMethodArgs(); - - for (int i = trueParamStart; i < methodArgSigs.length; i++) { - Object arg = methodArgs[i]; - ByteString argByteString = ByteString.copyFrom(serializer.encode(arg, null).array()); - builder.addArgs(argByteString); - builder.addArgTypes(methodArgSigs[i]); - } - return builder.build(); - } - - private void cacheCommonData(ConsumerConfig consumerConfig) { String serialization = consumerConfig.getSerialization(); if (StringUtils.isBlank(serialization)) { @@ -148,19 +132,29 @@ protected String getDefaultSerialization() { @Override public SofaResponse invoke(SofaRequest sofaRequest, int timeout) throws Exception { - if(!useGeneric){ - return stubCall(sofaRequest,timeout); + if (!useGeneric) { + return stubCall(sofaRequest, timeout); } MethodDescriptor.MethodType callType = mapCallType(sofaRequest.getMethod()); - if (callType.equals(MethodDescriptor.MethodType.UNARY)) { - return unaryCall(sofaRequest, timeout); - } else { - return streamCall(sofaRequest, timeout, callType); + switch (callType) { + case UNARY: + return genericUnaryCall(sofaRequest, timeout); + case BIDI_STREAMING: + return genericBinaryStreamCall(sofaRequest, timeout); + case CLIENT_STREAMING: + return genericClientStreamCall(sofaRequest, timeout); + case SERVER_STREAMING: + return genericServerStreamCall(sofaRequest, timeout); + default: + throw new SofaRpcException(RpcErrorType.CLIENT_CALL_TYPE, "Unknown stream call type:" + callType); } } private MethodDescriptor.MethodType mapCallType(Method method) { String streamType = methodCallType.get(method.getName()); + if(StringUtils.isBlank(streamType)) { + return MethodDescriptor.MethodType.UNARY; + } switch (streamType) { case RpcConstants.INVOKER_TYPE_BI_STREAMING: return MethodDescriptor.MethodType.BIDI_STREAMING; @@ -173,23 +167,19 @@ private MethodDescriptor.MethodType mapCallType(Method method) { } } - private SofaResponse streamCall(SofaRequest sofaRequest, int timeout, MethodDescriptor.MethodType callType) { - switch (callType) { - case BIDI_STREAMING: - return binaryStreamCall(sofaRequest, timeout); - case CLIENT_STREAMING: - return clientStreamCall(sofaRequest, timeout); - case SERVER_STREAMING: - return serverStreamCall(sofaRequest, timeout); - default: - throw new SofaRpcException(RpcErrorType.CLIENT_CALL_TYPE, "Unknown stream call type:" + callType); - } + private SofaResponse stubCall(SofaRequest sofaRequest, int timeout) throws Exception{ + SofaResponse sofaResponse = new SofaResponse(); + Object stub = sofaStub.invoke(null, channel, buildCustomCallOptions(sofaRequest, timeout), + timeout); + final Method method = sofaRequest.getMethod(); + Object appResponse = method.invoke(stub, sofaRequest.getMethodArgs()[0]); + sofaResponse.setAppResponse(appResponse); + return sofaResponse; } - - private SofaResponse unaryCall(SofaRequest sofaRequest, int timeout) throws Exception{ + private SofaResponse genericUnaryCall(SofaRequest sofaRequest, int timeout) { MethodDescriptor methodDescriptor = getMethodDescriptor(sofaRequest); - Request request = getRequest(sofaRequest, serialization, serializer, 0); + Request request = buildRequest(sofaRequest, serialization, serializer, 0); Response response = (Response) ClientCalls.blockingUnaryCall(channel, methodDescriptor, buildCustomCallOptions(sofaRequest, timeout), request); @@ -206,17 +196,7 @@ private SofaResponse unaryCall(SofaRequest sofaRequest, int timeout) throws Exce return sofaResponse; } - private SofaResponse stubCall(SofaRequest sofaRequest, int timeout) throws Exception{ - SofaResponse sofaResponse = new SofaResponse(); - Object stub = sofaStub.invoke(null, channel, buildCustomCallOptions(sofaRequest, timeout), - timeout); - final Method method = sofaRequest.getMethod(); - Object appResponse = method.invoke(stub, sofaRequest.getMethodArgs()[0]); - sofaResponse.setAppResponse(appResponse); - return sofaResponse; - } - - private SofaResponse binaryStreamCall(SofaRequest sofaRequest, int timeout) { + private SofaResponse genericBinaryStreamCall(SofaRequest sofaRequest, int timeout) { StreamHandler streamHandler = (StreamHandler) sofaRequest.getMethodArgs()[0]; MethodDescriptor methodDescriptor = getMethodDescriptor(sofaRequest); @@ -232,11 +212,14 @@ private SofaResponse binaryStreamCall(SofaRequest sofaRequest, int timeout) { StreamHandler handler = new StreamHandler() { @Override public void onMessage(Object message) { - SofaRequest request = MessageBuilder.copyEmptyRequest(sofaRequest); Object[] args = new Object[]{message}; + SofaRequest request = new SofaRequest(); + request.setInterfaceName(sofaRequest.getInterfaceName()); + request.setMethodName(sofaRequest.getMethodName()); + request.setMethod(sofaRequest.getMethod()); request.setMethodArgs(args); request.setMethodArgSigs(rebuildTrueRequestArgSigs(args)); - Request req = getRequest(request, serialization, serializer, 0); + Request req = buildRequest(request, serialization, serializer, 0); observer.onNext(req); } @@ -255,17 +238,17 @@ public void onException(Throwable throwable) { return sofaResponse; } - private SofaResponse clientStreamCall(SofaRequest sofaRequest, int timeout) { - return binaryStreamCall(sofaRequest, timeout); + private SofaResponse genericClientStreamCall(SofaRequest sofaRequest, int timeout) { + return genericBinaryStreamCall(sofaRequest, timeout); } - private SofaResponse serverStreamCall(SofaRequest sofaRequest, int timeout) { - StreamHandler streamHandler = (StreamHandler) sofaRequest.getMethodArgs()[0]; + private SofaResponse genericServerStreamCall(SofaRequest sofaRequest, int timeout) { + StreamHandler streamHandler = (StreamHandler) sofaRequest.getMethodArgs()[sofaRequest.getMethodArgs().length -1]; MethodDescriptor methodDescriptor = getMethodDescriptor(sofaRequest); ClientCall call = channel.newCall(methodDescriptor, buildCustomCallOptions(sofaRequest, timeout)); - Request req = getRequest(sofaRequest, serialization, serializer, 1); + Request req = buildRequest(sofaRequest, serialization, serializer, 1); ClientStreamObserverAdapter responseObserver = new ClientStreamObserverAdapter(streamHandler, sofaRequest.getSerializeType()); @@ -274,6 +257,24 @@ private SofaResponse serverStreamCall(SofaRequest sofaRequest, int timeout) { return new SofaResponse(); } + /** + * Build arg sigs for stream calls. + * + * @param requestArgs request args + * @return arg sigs, arg.getClass().getName(). + */ + private String[] rebuildTrueRequestArgSigs(Object[] requestArgs) { + String[] classes = new String[requestArgs.length]; + for (int k = 0; k < requestArgs.length; k++) { + if (requestArgs[k] != null) { + classes[k] = requestArgs[k].getClass().getName(); + } else { + classes[k] = void.class.getName(); + } + } + return classes; + } + @Override public ResponseFuture asyncInvoke(SofaRequest sofaRequest, int timeout) throws Exception { SofaResponseCallback sofaResponseCallback = sofaRequest.getSofaResponseCallback(); @@ -320,7 +321,7 @@ public void onCompleted() { }); } else { MethodDescriptor methodDescriptor = getMethodDescriptor(sofaRequest); - Request request = getRequest(sofaRequest, serialization, serializer, 0); + Request request = buildRequest(sofaRequest, serialization, serializer, 0); ClientCalls.asyncUnaryCall(channel.newCall(methodDescriptor, buildCustomCallOptions(sofaRequest, timeout)), request, new StreamObserver() { @Override public void onNext(Object o) { @@ -341,22 +342,20 @@ public void onCompleted() { return future; } - /** - * Build arg sigs for stream calls. - * - * @param requestArgs request args - * @return arg sigs, arg.getClass().getName(). - */ - private String[] rebuildTrueRequestArgSigs(Object[] requestArgs) { - String[] classes = new String[requestArgs.length]; - for (int k = 0; k < requestArgs.length; k++) { - if (requestArgs[k] != null) { - classes[k] = requestArgs[k].getClass().getName(); - } else { - classes[k] = void.class.getName(); - } + public static Request buildRequest(SofaRequest sofaRequest, String serialization, Serializer serializer, int backOffset) { + Request.Builder builder = Request.newBuilder(); + builder.setSerializeType(serialization); + + String[] methodArgSigs = sofaRequest.getMethodArgSigs(); + Object[] methodArgs = sofaRequest.getMethodArgs(); + + for (int i = 0; i < methodArgSigs.length - backOffset; i++) { + Object arg = methodArgs[i]; + ByteString argByteString = ByteString.copyFrom(serializer.encode(arg, null).array()); + builder.addArgs(argByteString); + builder.addArgTypes(methodArgSigs[i]); } - return classes; + return builder.build(); } private void processSuccess(boolean needDecode, RpcInternalContext context, SofaRequest sofaRequest, Object o, SofaResponseCallback sofaResponseCallback, TripleResponseFuture future, ClassLoader classLoader) { diff --git a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/utils/SofaProtoUtils.java b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/utils/SofaProtoUtils.java index c9a575f42..7cc35ea23 100644 --- a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/utils/SofaProtoUtils.java +++ b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/utils/SofaProtoUtils.java @@ -20,7 +20,6 @@ import com.alipay.sofa.rpc.common.utils.ClassUtils; import com.alipay.sofa.rpc.common.utils.StringUtils; import com.alipay.sofa.rpc.config.ConsumerConfig; -import com.alipay.sofa.rpc.config.ProviderConfig; import com.alipay.sofa.rpc.core.exception.RpcErrorType; import com.alipay.sofa.rpc.core.exception.SofaRpcException; import com.alipay.sofa.rpc.core.request.SofaRequest; @@ -93,22 +92,8 @@ public static MethodDescriptor.MethodType mapGrpcCallType(String callType) { } } - public static Map cacheStreamCallType(ConsumerConfig consumerConfig) { + public static Map cacheStreamCallType(Class proxyClass) { Map methodCallType = new ConcurrentHashMap<>(); - Class proxyClass = consumerConfig.getProxyClass(); - Method[] declaredMethods = proxyClass.getDeclaredMethods(); - for (Method method : declaredMethods) { - String streamType = mapStreamType(method); - if (StringUtils.isNotBlank(streamType)) { - methodCallType.put(method.getName(), streamType); - } - } - return methodCallType; - } - - public static Map cacheStreamCallType(ProviderConfig providerConfig) { - Map methodCallType = new ConcurrentHashMap<>(); - Class proxyClass = providerConfig.getProxyClass(); Method[] declaredMethods = proxyClass.getDeclaredMethods(); for (Method method : declaredMethods) { String streamType = mapStreamType(method); @@ -139,7 +124,7 @@ private static String mapStreamType(Method method) { return RpcConstants.INVOKER_TYPE_BI_STREAMING; } //ServerStream - else if (paramLen > 1 && StreamHandler.class.isAssignableFrom(paramClasses[0]) && void.class == returnClass) { + else if (paramLen > 1 && StreamHandler.class.isAssignableFrom(paramClasses[paramLen -1]) && void.class == returnClass) { return RpcConstants.INVOKER_TYPE_SERVER_STREAMING; } else if (StreamHandler.class.isAssignableFrom(returnClass) || Arrays.stream(paramClasses).anyMatch(StreamHandler.class::isAssignableFrom)) { throw new SofaRpcException(RpcErrorType.CLIENT_CALL_TYPE, "StreamHandler can only at the specified location of parameter. Please check related docs."); @@ -147,22 +132,6 @@ else if (paramLen > 1 && StreamHandler.class.isAssignableFrom(paramClasses[0]) & return null; } - /** - * Get and cache the call type of certain method - * @param request RPC request - * @return request call type - */ - /* - public String getAndCacheCallType(SofaRequest request) { - Method method = request.getMethod(); - String callType = MethodConfig - .mapStreamType( - method, null); - //Method level config - updateAttribute(buildMethodConfigKey(request, RpcConstants.CONFIG_KEY_INVOKE_TYPE), callType, true); - return callType; - }*/ - /** * 通过请求的目标方法构建方法配置key。该key使用内部配置格式。(以'.' 开头) * diff --git a/remoting/remoting-triple/src/test/java/com/alipay/sofa/rpc/server/triple/GenericServiceImplTest.java b/remoting/remoting-triple/src/test/java/com/alipay/sofa/rpc/server/triple/GenericServiceImplTest.java index a45105de6..ba06f9d5b 100644 --- a/remoting/remoting-triple/src/test/java/com/alipay/sofa/rpc/server/triple/GenericServiceImplTest.java +++ b/remoting/remoting-triple/src/test/java/com/alipay/sofa/rpc/server/triple/GenericServiceImplTest.java @@ -136,7 +136,7 @@ private Object getReturnValue(Method method) { private Request buildRequest(Method method, Object[] args) { Class[] parameterTypes = method.getParameterTypes(); SofaRequest sofaRequest = MessageBuilder.buildSofaRequest(HelloService.class, method, parameterTypes, args); - Request request = TripleClientInvoker.getRequest(sofaRequest, serialization, serializer, 0); + Request request = TripleClientInvoker.buildRequest(sofaRequest, serialization, serializer, 0); Context context = Context.current().withValue(TracingContextKey.getKeySofaRequest(), sofaRequest); context.attach(); return request; diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/ClientRequest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/ClientRequest.java index 731cc5aa4..9da046c09 100644 --- a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/ClientRequest.java +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/ClientRequest.java @@ -17,17 +17,17 @@ package com.alipay.sofa.rpc.test.triple.stream; public class ClientRequest { - private String meg; + private String msg; private int count; - public ClientRequest(String meg, int count) { - this.meg = meg; + public ClientRequest(String msg, int count) { + this.msg = msg; this.count = count; } public String getMsg() { - return meg; + return msg; } public int getCount() { diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/ExtendClientRequest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/ExtendClientRequest.java index 2a309fe26..cd5adee15 100644 --- a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/ExtendClientRequest.java +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/ExtendClientRequest.java @@ -24,8 +24,8 @@ public class ExtendClientRequest extends ClientRequest { private String extendString; - public ExtendClientRequest(String meg, int count, String extendString) { - super(meg, count); + public ExtendClientRequest(String msg, int count, String extendString) { + super(msg, count); this.extendString = extendString; } diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/HelloService.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/HelloService.java index 8f8c280cd..6b51cf23b 100644 --- a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/HelloService.java +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/HelloService.java @@ -22,12 +22,12 @@ public interface HelloService { String CMD_TRIGGER_STREAM_FINISH = "finish"; - String CMD_TRIGGER_STEAM_ERROR = "error"; + String CMD_TRIGGER_STREAM_ERROR = "error"; String ERROR_MSG = "error msg"; StreamHandler sayHelloBiStream(StreamHandler streamHandler); - void sayHelloServerStream(StreamHandler streamHandler, ClientRequest clientRequest); + void sayHelloServerStream(ClientRequest clientRequest, StreamHandler streamHandler); } diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/HelloServiceImpl.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/HelloServiceImpl.java index 2527b78fa..58072e9d4 100644 --- a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/HelloServiceImpl.java +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/HelloServiceImpl.java @@ -33,7 +33,7 @@ public void onMessage(ClientRequest clientRequest) { LOGGER.info("bi stream req onMessage"); if (clientRequest.getMsg().equals(CMD_TRIGGER_STREAM_FINISH)) { streamHandler.onFinish(); - } else if (clientRequest.getMsg().equals(CMD_TRIGGER_STEAM_ERROR)) { + } else if (clientRequest.getMsg().equals(CMD_TRIGGER_STREAM_ERROR)) { streamHandler.onException(new RuntimeException(ERROR_MSG)); } else { if (clientRequest instanceof ExtendClientRequest) { @@ -61,7 +61,7 @@ public void onException(Throwable throwable) { } @Override - public void sayHelloServerStream(StreamHandler streamHandler, ClientRequest clientRequest) { + public void sayHelloServerStream(ClientRequest clientRequest, StreamHandler streamHandler) { LOGGER.info("server stream req receive"); streamHandler.onMessage(new ServerResponse(clientRequest.getMsg(), clientRequest.getCount())); streamHandler.onMessage(new ExtendServerResponse(clientRequest.getMsg(), clientRequest.getCount() + 1, @@ -70,7 +70,7 @@ public void sayHelloServerStream(StreamHandler streamHandler, Cl streamHandler.onMessage(new ExtendServerResponse(clientRequest.getMsg(), clientRequest.getCount() + 3, "extendString")); streamHandler.onMessage(new ServerResponse(clientRequest.getMsg(), clientRequest.getCount() + 4)); - if (clientRequest.getMsg().equals(CMD_TRIGGER_STEAM_ERROR)) { + if (clientRequest.getMsg().equals(CMD_TRIGGER_STREAM_ERROR)) { streamHandler.onException(new RuntimeException(ERROR_MSG)); } else { streamHandler.onFinish(); diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/TripleGenericStreamTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/TripleGenericStreamTest.java index b77ea5013..504b52b04 100644 --- a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/TripleGenericStreamTest.java +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/stream/TripleGenericStreamTest.java @@ -58,7 +58,7 @@ public static void beforeClass() throws InterruptedException { RpcRunningState.setUnitTestMode(true); ServerConfig serverConfig = new ServerConfig() .setProtocol("tri") - .setPort(12200) + .setPort(50066) .setDaemon(false); helloServiceInst = Mockito.spy(new HelloServiceImpl()); @@ -71,7 +71,7 @@ public static void beforeClass() throws InterruptedException { consumerConfig = new ConsumerConfig() .setInterfaceId(HelloService.class.getName()) .setProtocol("tri") - .setDirectUrl("triple://127.0.0.1:12200"); + .setDirectUrl("triple://127.0.0.1:50066"); helloServiceRef = consumerConfig.refer(); } @@ -147,7 +147,7 @@ public void onException(Throwable throwable) { Assert.assertFalse(receivedException.get()); Assert.assertThrows(Throwable.class, () -> streamHandler.onMessage(new ClientRequest("", 123))); } else { - streamHandler.onMessage(new ClientRequest(HelloService.CMD_TRIGGER_STEAM_ERROR, -2)); + streamHandler.onMessage(new ClientRequest(HelloService.CMD_TRIGGER_STREAM_ERROR, -2)); Assert.assertTrue(countDownLatch.await(20, TimeUnit.SECONDS)); streamHandler.onException(new RuntimeException(HelloService.ERROR_MSG)); Assert.assertThrows(Throwable.class, () -> streamHandler.onMessage(new ClientRequest(HELLO_MSG, 0))); @@ -169,8 +169,6 @@ public void testTripleServerStreamException() throws InterruptedException { public void testTripleServerStream(boolean endWithException) throws InterruptedException { reset(helloServiceInst); - HelloService helloServiceRef = consumerConfig.refer(); - Thread.sleep(5000); AtomicInteger count = new AtomicInteger(0); int responseTimes = 5; CountDownLatch countDownLatch = new CountDownLatch(responseTimes + 1); @@ -178,10 +176,10 @@ public void testTripleServerStream(boolean endWithException) throws InterruptedE AtomicBoolean responseException = new AtomicBoolean(false); List serverResponseList = new ArrayList<>(); - helloServiceRef.sayHelloServerStream(new StreamHandler() { + helloServiceRef.sayHelloServerStream(new ClientRequest(endWithException ? HelloService.CMD_TRIGGER_STREAM_ERROR : HELLO_MSG, 0), new StreamHandler() { @Override public void onMessage(ServerResponse message) { - Assert.assertEquals(endWithException ? HelloService.CMD_TRIGGER_STEAM_ERROR : HELLO_MSG, + Assert.assertEquals(endWithException ? HelloService.CMD_TRIGGER_STREAM_ERROR : HELLO_MSG, message.getMsg()); Assert.assertEquals(count.getAndIncrement(), message.getCount()); serverResponseList.add(message); @@ -200,7 +198,7 @@ public void onException(Throwable throwable) { responseException.set(true); countDownLatch.countDown(); } - }, new ClientRequest(endWithException ? HelloService.CMD_TRIGGER_STEAM_ERROR : HELLO_MSG, 0)); + }); Assert.assertTrue(countDownLatch.await(20, TimeUnit.SECONDS)); if (endWithException) {