Skip to content

Commit

Permalink
fix triple multi classloader switch problem (#1278)
Browse files Browse the repository at this point in the history
* fix triple multi classloader switch problem

* fix triple client and server log uniqueName without version info

* fix triple client and server log uniqueName without version info

* fix triple client and server log uniqueName without version info

* add two uniqueId case

* feat: tri test case 4 ark

Co-authored-by: liujianjun.ljj <liujianjun.ljj@antgroup.com>
Co-authored-by: 均源 <zhangminglun.zml@antgroup.com>
  • Loading branch information
3 people authored Jan 5, 2023
1 parent e203f2b commit 9eff2f6
Show file tree
Hide file tree
Showing 12 changed files with 598 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static void main(String[] args) {
.setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE)
//.setRegistry(registryConfig)
.setApplication(clientApp)
.setDirectUrl("tri://10.15.232.18:19544")
.setDirectUrl("tri://127.0.0.1:50051")
.setRegister(false);

SofaGreeterTriple.IGreeter greeterBlockingStub = consumerConfig.refer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,13 @@

import com.alipay.sofa.rpc.codec.Serializer;
import com.alipay.sofa.rpc.codec.SerializerFactory;
import com.alipay.sofa.rpc.common.cache.ReflectCache;
import com.alipay.sofa.rpc.common.utils.ClassTypeUtils;
import com.alipay.sofa.rpc.common.utils.ClassUtils;
import com.alipay.sofa.rpc.config.ConfigUniqueNameGenerator;
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.core.exception.RpcErrorType;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.exception.SofaRpcRuntimeException;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.core.response.SofaResponse;
import com.alipay.sofa.rpc.invoke.Invoker;
import com.alipay.sofa.rpc.log.Logger;
import com.alipay.sofa.rpc.log.LoggerFactory;
import com.alipay.sofa.rpc.tracer.sofatracer.TracingContextKey;
Expand All @@ -52,18 +48,11 @@ public class GenericServiceImpl extends SofaGenericServiceTriple.GenericServiceI

private static final Logger LOGGER = LoggerFactory.getLogger(GenericServiceImpl.class);

protected Invoker invoker;
protected ProviderConfig providerConfig;
protected UniqueIdInvoker invoker;

public GenericServiceImpl(Invoker invoker, ProviderConfig providerConfig) {
public GenericServiceImpl(UniqueIdInvoker invoker) {
super();
this.invoker = invoker;
this.providerConfig = providerConfig;
String key = ConfigUniqueNameGenerator.getUniqueName(providerConfig);
// 缓存接口的方法
for (Method m : providerConfig.getProxyClass().getMethods()) {
ReflectCache.putOverloadMethodCache(key, m);
}
}

@Override
Expand All @@ -74,15 +63,16 @@ public void generic(Request request, StreamObserver<Response> responseObserver)
SofaRequest sofaRequest = TracingContextKey.getKeySofaRequest().get(Context.current());
String methodName = sofaRequest.getMethodName();
try {
String key = ConfigUniqueNameGenerator.getUniqueName(providerConfig);
ClassLoader interfaceClassLoader = providerConfig.getProxyClass().getClassLoader();
Thread.currentThread().setContextClassLoader(interfaceClassLoader);
ClassLoader serviceClassLoader = invoker.getServiceClassLoader(sofaRequest);
Thread.currentThread().setContextClassLoader(serviceClassLoader);

Method declaredMethod = invoker.getDeclaredMethod(sofaRequest, request);
if (declaredMethod == null) {
throw new SofaRpcException(RpcErrorType.SERVER_NOT_FOUND_INVOKER, "Cannot find invoke method " +
methodName);
}
Class[] argTypes = getArgTypes(request);
Serializer serializer = SerializerFactory.getSerializer(request.getSerializeType());

Method declaredMethod = ReflectCache.getOverloadMethodCache(key, methodName, request.getArgTypesList()
.toArray(new String[0]));
Object[] invokeArgs = getInvokeArgs(request, argTypes, serializer);

// fill sofaRequest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,14 @@ public void stop() {

@Override
public void registerProcessor(ProviderConfig providerConfig, Invoker instance) {
Object ref = providerConfig.getRef();
this.lock.lock();
try {
String key = ConfigUniqueNameGenerator.getUniqueName(providerConfig);
ReflectCache.registerServiceClassLoader(key, providerConfig.getProxyClass().getClassLoader());
// 缓存接口的方法
for (Method m : providerConfig.getProxyClass().getMethods()) {
ReflectCache.putOverloadMethodCache(key, m);
}
// wrap invoker to support unique id
UniqueIdInvoker oldInvoker = this.invokerMap.putIfAbsent(providerConfig.getInterfaceId(), new UniqueIdInvoker());
if (null != oldInvoker) {
Expand All @@ -261,22 +266,7 @@ public void registerProcessor(ProviderConfig providerConfig, Invoker instance) {
throw new IllegalStateException("Can not expose service with interface:" + providerConfig.getInterfaceId() + " and unique id: " + providerConfig.getUniqueId());
}

// create service definition
ServerServiceDefinition serviceDef;
if (SofaProtoUtils.isProtoClass(ref)) {
// refer is BindableService
this.setBindableProxiedImpl(providerConfig, uniqueIdInvoker);
BindableService bindableService = (BindableService) providerConfig.getRef();
serviceDef = bindableService.bindService();
} else {
GenericServiceImpl genericService = new GenericServiceImpl(uniqueIdInvoker, providerConfig);
genericService.setProxiedImpl(genericService);
serviceDef = buildSofaServiceDef(genericService, providerConfig);
}

List<TripleServerInterceptor> interceptorList = buildInterceptorChain(serviceDef);
ServerServiceDefinition serviceDefinition = ServerInterceptors.intercept(
serviceDef, interceptorList);
ServerServiceDefinition serviceDefinition = getServerServiceDefinition(providerConfig, uniqueIdInvoker);
this.serviceInfo.put(providerConfig, serviceDefinition);
ServerServiceDefinition ssd = this.handlerRegistry.addService(serviceDefinition);
if (ssd != null) {
Expand All @@ -294,6 +284,26 @@ public void registerProcessor(ProviderConfig providerConfig, Invoker instance) {
}
}

private ServerServiceDefinition getServerServiceDefinition(ProviderConfig providerConfig, UniqueIdInvoker uniqueIdInvoker) {
// create service definition
ServerServiceDefinition serviceDef;
if (SofaProtoUtils.isProtoClass(providerConfig.getRef())) {
// refer is BindableService
this.setBindableProxiedImpl(providerConfig, uniqueIdInvoker);
BindableService bindableService = (BindableService) providerConfig.getRef();
serviceDef = bindableService.bindService();
} else {
GenericServiceImpl genericService = new GenericServiceImpl(uniqueIdInvoker);
genericService.setProxiedImpl(genericService);
serviceDef = buildSofaServiceDef(genericService, providerConfig);
}

List<TripleServerInterceptor> interceptorList = buildInterceptorChain(serviceDef);
ServerServiceDefinition serviceDefinition = ServerInterceptors.intercept(
serviceDef, interceptorList);
return serviceDefinition;
}

private void setBindableProxiedImpl(ProviderConfig providerConfig, Invoker invoker) {
Class<?> implClass = providerConfig.getRef().getClass();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,20 @@
*/
package com.alipay.sofa.rpc.server.triple;

import com.alipay.sofa.rpc.common.cache.ReflectCache;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.config.ConfigUniqueNameGenerator;
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.context.RpcInvokeContext;
import com.alipay.sofa.rpc.core.exception.RpcErrorType;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.core.response.SofaResponse;
import com.alipay.sofa.rpc.invoke.Invoker;
import com.alipay.sofa.rpc.server.ProviderProxyInvoker;
import triple.Request;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -164,4 +169,33 @@ private Invoker findInvoker(String interfaceName, String uniqueId) {
}
}

public ClassLoader getServiceClassLoader(SofaRequest sofaRequest) {
String uniqueName = this.getServiceUniqueName(sofaRequest);
return ReflectCache.getServiceClassLoader(uniqueName);
}

public Method getDeclaredMethod(SofaRequest sofaRequest, Request request) {
String uniqueName = this.getServiceUniqueName(sofaRequest);
return ReflectCache.getOverloadMethodCache(uniqueName, sofaRequest.getMethodName(), request
.getArgTypesList()
.toArray(new String[0]));
}

private String getServiceUniqueName(SofaRequest sofaRequest) {
this.readLock.lock();
try {
Invoker invoker = this.findInvoker(sofaRequest.getInterfaceName(), getUniqueIdFromInvokeContext());
ProviderConfig providerConfig = null;
if (invoker instanceof ProviderProxyInvoker) {
providerConfig = ((ProviderProxyInvoker) invoker).getProviderConfig();
}
if (providerConfig == null) {
throw new SofaRpcException(RpcErrorType.SERVER_NOT_FOUND_INVOKER, "Cannot find providerConfig");
}
return ConfigUniqueNameGenerator.getUniqueName(providerConfig);
} finally {
this.readLock.unlock();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
import com.alipay.sofa.rpc.codec.Serializer;
import com.alipay.sofa.rpc.codec.SerializerFactory;
import com.alipay.sofa.rpc.codec.sofahessian.SofaHessianSerializer;
import com.alipay.sofa.rpc.common.cache.ReflectCache;
import com.alipay.sofa.rpc.config.ConfigUniqueNameGenerator;
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.filter.ProviderInvoker;
import com.alipay.sofa.rpc.message.MessageBuilder;
import com.alipay.sofa.rpc.server.ProviderProxyInvoker;
import com.alipay.sofa.rpc.tracer.sofatracer.TracingContextKey;
import com.alipay.sofa.rpc.transport.ByteArrayWrapperByteBuf;
import com.alipay.sofa.rpc.transport.triple.TripleClientInvoker;
Expand Down Expand Up @@ -53,8 +54,16 @@ public GenericServiceImplTest(){
providerConfig = new ProviderConfig<>();
providerConfig.setRef(new HelloServiceImpl());
providerConfig.setInterfaceId(HelloService.class.getName());
ProviderInvoker<HelloService> invoker = new ProviderInvoker<>(providerConfig);
genericService = new GenericServiceImpl(invoker,providerConfig);
String key = ConfigUniqueNameGenerator.getUniqueName(providerConfig);
ReflectCache.registerServiceClassLoader(key, providerConfig.getProxyClass().getClassLoader());
// 缓存接口的方法
for (Method m : providerConfig.getProxyClass().getMethods()) {
ReflectCache.putOverloadMethodCache(key, m);
}
ProviderProxyInvoker invoker = new ProviderProxyInvoker(providerConfig);
UniqueIdInvoker uniqueIdInvoker = new UniqueIdInvoker();
uniqueIdInvoker.registerInvoker(providerConfig, invoker);
genericService = new GenericServiceImpl(uniqueIdInvoker);
responseObserver = new MockStreamObserver<>();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.test;

/**
*
* @author <a href=mailto:zhanggeng.zg@antfin.com>GengZhang</a>
*/
public class AnotherHelloServiceImpl implements HelloService {

private int sleep;

private String result;

public AnotherHelloServiceImpl() {

}

public AnotherHelloServiceImpl(String result) {
this.result = result;
}

public AnotherHelloServiceImpl(int sleep) {
this.sleep = sleep;
}

@Override
public String sayHello(String name, int age) {
if (sleep > 0) {
try {
Thread.sleep(sleep);
} catch (Exception ignore) {
}
}
return result != null ? result : "hello " + name + " from server! age: " + age;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.test.triple;

import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.examples.helloworld.SofaGreeterTriple;
import io.grpc.stub.StreamObserver;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
* @author Even
* @date 2023/1/3 12:17 AM
*/
public class GreeterImpl2 extends SofaGreeterTriple.GreeterImplBase {

//Intentionally using unsupported format
static final DateTimeFormatter[] datetimeFormatter = new DateTimeFormatter[] { DateTimeFormatter.ISO_DATE_TIME,
DateTimeFormatter.ISO_LOCAL_DATE_TIME,
DateTimeFormatter.BASIC_ISO_DATE };

@Override
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
HelloRequest.DateTime reqDateTime = req.getDateTime();
int i = 0;
try {
i = Integer.parseInt(reqDateTime.getTime());
} catch (Exception e) {
//TODO: handle exception
}
LocalDateTime dt = LocalDateTime.now();
String dtStr = dt.format(datetimeFormatter[i % datetimeFormatter.length]);
HelloRequest.DateTime rplyDateTime = HelloRequest.DateTime.newBuilder(reqDateTime)
.setDate(dtStr).build();
HelloReply reply = HelloReply.newBuilder()
.setMessage("Hello2 " + req.getName())
.setDateTime(rplyDateTime)
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,55 @@ public void testSyncTimeout() {
Assert.assertTrue(exp);
}

@Test
public void testExposeTwoUniqueId() {
ApplicationConfig applicationConfig = new ApplicationConfig().setAppName("triple-server1");

int port = 50052;

ServerConfig serverConfig = new ServerConfig()
.setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE)
.setPort(port);

ProviderConfig<SofaGreeterTriple.IGreeter> providerConfig = new ProviderConfig<SofaGreeterTriple.IGreeter>()
.setApplication(applicationConfig)
.setUniqueId("abc")
.setBootstrap(RpcConstants.PROTOCOL_TYPE_TRIPLE)
.setInterfaceId(SofaGreeterTriple.IGreeter.class.getName())
.setRef(new GreeterImpl())
.setServer(serverConfig);

providerConfig.export();

ProviderConfig<SofaGreeterTriple.IGreeter> providerConfig2 = new ProviderConfig<SofaGreeterTriple.IGreeter>()
.setApplication(applicationConfig)
.setUniqueId("abcd")
.setBootstrap(RpcConstants.PROTOCOL_TYPE_TRIPLE)
.setInterfaceId(SofaGreeterTriple.IGreeter.class.getName())
.setRef(new GreeterImpl2())
.setServer(serverConfig);
providerConfig2.export();

ConsumerConfig<SofaGreeterTriple.IGreeter> consumerConfig = new ConsumerConfig<>();
consumerConfig.setInterfaceId(SofaGreeterTriple.IGreeter.class.getName())
.setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE)
.setUniqueId("abc")
.setDirectUrl("tri://127.0.0.1:" + port);
SofaGreeterTriple.IGreeter greeterBlockingStub = consumerConfig.refer();
HelloRequest.DateTime dateTime = HelloRequest.DateTime.newBuilder().setDate("2018-12-28").setTime("11:13:00")
.build();
HelloRequest request = HelloRequest.newBuilder().setName("world").setDateTime(dateTime).build();
Assert.assertEquals("Hello world", greeterBlockingStub.sayHello(request).getMessage());

ConsumerConfig<SofaGreeterTriple.IGreeter> consumerConfig2 = new ConsumerConfig<>();
consumerConfig2.setInterfaceId(SofaGreeterTriple.IGreeter.class.getName())
.setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE)
.setUniqueId("abcd")
.setDirectUrl("tri://127.0.0.1:" + port);
SofaGreeterTriple.IGreeter greeterBlockingStub2 = consumerConfig2.refer();
Assert.assertEquals("Hello2 world", greeterBlockingStub2.sayHello(request).getMessage());
}

@BeforeClass
public static void adBeforeClass() {
RpcRunningState.setUnitTestMode(true);
Expand Down
Loading

0 comments on commit 9eff2f6

Please sign in to comment.