From 1e386ed9be503ed9b38c8be04de406603888839f Mon Sep 17 00:00:00 2001 From: evenliu Date: Mon, 2 Jan 2023 17:40:51 +0800 Subject: [PATCH 01/11] fix unit test (#1291) Co-authored-by: liujianjun.ljj --- .../alipay/sofa/rpc/test/triple/TripleAsyncInvokeTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/TripleAsyncInvokeTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/TripleAsyncInvokeTest.java index 13b6d7104..c5f78c238 100644 --- a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/TripleAsyncInvokeTest.java +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/TripleAsyncInvokeTest.java @@ -51,13 +51,14 @@ public class TripleAsyncInvokeTest { @BeforeClass public static void start() { ServerConfig serverConfig2 = new ServerConfig() - .setPort(22223) + .setPort(50053) .setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE) .setDaemon(false); // 服务端 ProviderConfig CProvider = new ProviderConfig() .setInterfaceId(HelloService.class.getName()) + .setBootstrap(RpcConstants.PROTOCOL_TYPE_TRIPLE) .setRef(new HelloServiceImpl(1000)) .setServer(serverConfig2); CProvider.export(); @@ -68,7 +69,7 @@ public static void start() { .setInvokeType(RpcConstants.INVOKER_TYPE_FUTURE) .setTimeout(5000) .setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE) - .setDirectUrl("127.0.0.1:22223"); + .setDirectUrl("127.0.0.1:50053"); future = BConsumer.refer(); ConsumerConfig BBConsumer = new ConsumerConfig() @@ -76,7 +77,7 @@ public static void start() { .setInvokeType(RpcConstants.INVOKER_TYPE_CALLBACK) .setTimeout(5000) .setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE) - .setDirectUrl("127.0.0.1:22223"); + .setDirectUrl("127.0.0.1:50053"); callback = BBConsumer.refer(); } From 8b6b7c763742b9e04ab0e73b3cc09e9145edd63d Mon Sep 17 00:00:00 2001 From: rickey17 Date: Tue, 3 Jan 2023 19:15:44 +0800 Subject: [PATCH 02/11] fix getDeserializerForCustomThrowable performance (#1288) * fix getDeserializerForCustomThrowable performance * code format for new file * fix ut errr and remove the cache map * fix GenericBenchmarkTest UT Co-authored-by: xingqi.xq Co-authored-by: liujianjun.ljj --- ...tipleClassLoaderSofaSerializerFactory.java | 9 +- ...ingleClassLoaderSofaSerializerFactory.java | 9 +- ...eClassLoaderSofaSerializerFactoryTest.java | 2 + ...eClassLoaderSofaSerializerFactoryTest.java | 2 + .../GenericCustomThrowableDeterminerTest.java | 28 +++ .../SofaResponseHessianSerializerTest.java | 3 + .../test/generic/GenericBenchmarkTest.java | 200 ++++++++++++++++++ 7 files changed, 245 insertions(+), 8 deletions(-) create mode 100644 test/test-integration/src/test/java/com/alipay/sofa/rpc/test/generic/GenericBenchmarkTest.java diff --git a/codec/codec-sofa-hessian/src/main/java/com/alipay/sofa/rpc/codec/sofahessian/GenericMultipleClassLoaderSofaSerializerFactory.java b/codec/codec-sofa-hessian/src/main/java/com/alipay/sofa/rpc/codec/sofahessian/GenericMultipleClassLoaderSofaSerializerFactory.java index 2c23081db..7993f7686 100644 --- a/codec/codec-sofa-hessian/src/main/java/com/alipay/sofa/rpc/codec/sofahessian/GenericMultipleClassLoaderSofaSerializerFactory.java +++ b/codec/codec-sofa-hessian/src/main/java/com/alipay/sofa/rpc/codec/sofahessian/GenericMultipleClassLoaderSofaSerializerFactory.java @@ -86,15 +86,16 @@ public Deserializer getDeserializer(String type) throws HessianProtocolException return super.getDeserializer(type); } - // 自定义Throwable采用JavaDeserializer,反序列化成Throwable而不是GenericObject - Deserializer deserializer = getDeserializerForCustomThrowable(type); + // 查看是否已经包含反序列化器 + Deserializer deserializer = DESERIALIZER_MAP.get(type); if (deserializer != null) { return deserializer; } - // 查看是否已经包含反序列化器 - deserializer = DESERIALIZER_MAP.get(type); + // 自定义Throwable采用JavaDeserializer,反序列化成Throwable而不是GenericObject + deserializer = getDeserializerForCustomThrowable(type); if (deserializer != null) { + DESERIALIZER_MAP.putIfAbsent(type, deserializer); return deserializer; } diff --git a/codec/codec-sofa-hessian/src/main/java/com/alipay/sofa/rpc/codec/sofahessian/GenericSingleClassLoaderSofaSerializerFactory.java b/codec/codec-sofa-hessian/src/main/java/com/alipay/sofa/rpc/codec/sofahessian/GenericSingleClassLoaderSofaSerializerFactory.java index 8409243d6..92e467424 100644 --- a/codec/codec-sofa-hessian/src/main/java/com/alipay/sofa/rpc/codec/sofahessian/GenericSingleClassLoaderSofaSerializerFactory.java +++ b/codec/codec-sofa-hessian/src/main/java/com/alipay/sofa/rpc/codec/sofahessian/GenericSingleClassLoaderSofaSerializerFactory.java @@ -86,15 +86,16 @@ public Deserializer getDeserializer(String type) throws HessianProtocolException return super.getDeserializer(type); } - // 自定义Throwable采用JavaDeserializer,反序列化成Throwable而不是GenericObject - Deserializer deserializer = getDeserializerForCustomThrowable(type); + // 查看是否已经包含反序列化器 + Deserializer deserializer = DESERIALIZER_MAP.get(type); if (deserializer != null) { return deserializer; } - // 查看是否已经包含反序列化器 - deserializer = DESERIALIZER_MAP.get(type); + // 自定义Throwable采用JavaDeserializer,反序列化成Throwable而不是GenericObject + deserializer = getDeserializerForCustomThrowable(type); if (deserializer != null) { + DESERIALIZER_MAP.putIfAbsent(type, deserializer); return deserializer; } diff --git a/codec/codec-sofa-hessian/src/test/java/com/alipay/sofa/rpc/codec/sofahessian/GenericMultipleClassLoaderSofaSerializerFactoryTest.java b/codec/codec-sofa-hessian/src/test/java/com/alipay/sofa/rpc/codec/sofahessian/GenericMultipleClassLoaderSofaSerializerFactoryTest.java index 1b0584d6e..4dc8693f2 100644 --- a/codec/codec-sofa-hessian/src/test/java/com/alipay/sofa/rpc/codec/sofahessian/GenericMultipleClassLoaderSofaSerializerFactoryTest.java +++ b/codec/codec-sofa-hessian/src/test/java/com/alipay/sofa/rpc/codec/sofahessian/GenericMultipleClassLoaderSofaSerializerFactoryTest.java @@ -38,6 +38,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import static com.alipay.sofa.rpc.codec.sofahessian.serialize.GenericCustomThrowableDeterminerTest.clearCacheDeserializerMap; import static com.alipay.sofa.rpc.codec.sofahessian.serialize.GenericCustomThrowableDeterminerTest.setGenericThrowException; /** @@ -109,6 +110,7 @@ public void testCustomThrowableDeserializerEnabled() throws Exception { Assert.assertEquals("MockError", ((MockError) result).getMessage()); } finally { setGenericThrowException(false); + clearCacheDeserializerMap(); } } } \ No newline at end of file diff --git a/codec/codec-sofa-hessian/src/test/java/com/alipay/sofa/rpc/codec/sofahessian/GenericSingleClassLoaderSofaSerializerFactoryTest.java b/codec/codec-sofa-hessian/src/test/java/com/alipay/sofa/rpc/codec/sofahessian/GenericSingleClassLoaderSofaSerializerFactoryTest.java index 05eb80c34..66309ea47 100644 --- a/codec/codec-sofa-hessian/src/test/java/com/alipay/sofa/rpc/codec/sofahessian/GenericSingleClassLoaderSofaSerializerFactoryTest.java +++ b/codec/codec-sofa-hessian/src/test/java/com/alipay/sofa/rpc/codec/sofahessian/GenericSingleClassLoaderSofaSerializerFactoryTest.java @@ -39,6 +39,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import static com.alipay.sofa.rpc.codec.sofahessian.serialize.GenericCustomThrowableDeterminerTest.clearCacheDeserializerMap; import static com.alipay.sofa.rpc.codec.sofahessian.serialize.GenericCustomThrowableDeterminerTest.setGenericThrowException; /** @@ -116,6 +117,7 @@ public void testCustomThrowableDeserializerEnabled() throws Exception { Assert.assertEquals("MockError", ((MockError) result).getMessage()); } finally { setGenericThrowException(false); + clearCacheDeserializerMap(); } } } \ No newline at end of file diff --git a/codec/codec-sofa-hessian/src/test/java/com/alipay/sofa/rpc/codec/sofahessian/serialize/GenericCustomThrowableDeterminerTest.java b/codec/codec-sofa-hessian/src/test/java/com/alipay/sofa/rpc/codec/sofahessian/serialize/GenericCustomThrowableDeterminerTest.java index d7d23fd43..71fe1e791 100644 --- a/codec/codec-sofa-hessian/src/test/java/com/alipay/sofa/rpc/codec/sofahessian/serialize/GenericCustomThrowableDeterminerTest.java +++ b/codec/codec-sofa-hessian/src/test/java/com/alipay/sofa/rpc/codec/sofahessian/serialize/GenericCustomThrowableDeterminerTest.java @@ -17,11 +17,15 @@ package com.alipay.sofa.rpc.codec.sofahessian.serialize; import com.alipay.hessian.generic.model.GenericObject; +import com.alipay.sofa.rpc.codec.sofahessian.GenericMultipleClassLoaderSofaSerializerFactory; +import com.alipay.sofa.rpc.codec.sofahessian.GenericSingleClassLoaderSofaSerializerFactory; +import com.caucho.hessian.io.Deserializer; import org.junit.Assert; import org.junit.Test; import java.lang.reflect.Field; import java.lang.reflect.Modifier; +import java.util.concurrent.ConcurrentHashMap; import static com.alipay.sofa.rpc.codec.sofahessian.serialize.GenericCustomThrowableDeterminer.judgeCustomThrowableForGenericObject; @@ -66,4 +70,28 @@ public static void setGenericThrowException(boolean enabled) { e.printStackTrace(); } } + + public static void clearCacheDeserializerMap() { + try { + Field field = GenericMultipleClassLoaderSofaSerializerFactory.class.getDeclaredField("DESERIALIZER_MAP"); + field.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); + field.set(null, new ConcurrentHashMap()); + } catch (Exception e) { + e.printStackTrace(); + } + + try { + Field field = GenericSingleClassLoaderSofaSerializerFactory.class.getDeclaredField("DESERIALIZER_MAP"); + field.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); + field.set(null, new ConcurrentHashMap()); + } catch (Exception e) { + e.printStackTrace(); + } + } } \ No newline at end of file diff --git a/codec/codec-sofa-hessian/src/test/java/com/alipay/sofa/rpc/codec/sofahessian/serialize/SofaResponseHessianSerializerTest.java b/codec/codec-sofa-hessian/src/test/java/com/alipay/sofa/rpc/codec/sofahessian/serialize/SofaResponseHessianSerializerTest.java index 59d5cf277..ea6ff7f6c 100644 --- a/codec/codec-sofa-hessian/src/test/java/com/alipay/sofa/rpc/codec/sofahessian/serialize/SofaResponseHessianSerializerTest.java +++ b/codec/codec-sofa-hessian/src/test/java/com/alipay/sofa/rpc/codec/sofahessian/serialize/SofaResponseHessianSerializerTest.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.Map; +import static com.alipay.sofa.rpc.codec.sofahessian.serialize.GenericCustomThrowableDeterminerTest.clearCacheDeserializerMap; import static com.alipay.sofa.rpc.codec.sofahessian.serialize.GenericCustomThrowableDeterminerTest.setGenericThrowException; /** @@ -89,6 +90,7 @@ public void testCustomThrowableDeserializerEnabled() throws Exception { Assert.assertEquals("MockError", ((MockError) sofaResponse2.getAppResponse()).getMessage()); } finally { setGenericThrowException(false); + clearCacheDeserializerMap(); } } @@ -117,6 +119,7 @@ public void testCustomThrowableDeserializerEnabledForIncompatible() throws Excep + "found, error: ")); } finally { setGenericThrowException(false); + clearCacheDeserializerMap(); } } diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/generic/GenericBenchmarkTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/generic/GenericBenchmarkTest.java new file mode 100644 index 000000000..34ac20993 --- /dev/null +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/generic/GenericBenchmarkTest.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.test.generic; + +import com.alipay.hessian.generic.model.GenericObject; +import com.alipay.sofa.rpc.api.GenericService; +import com.alipay.sofa.rpc.codec.sofahessian.GenericMultipleClassLoaderSofaSerializerFactory; +import com.alipay.sofa.rpc.codec.sofahessian.GenericSingleClassLoaderSofaSerializerFactory; +import com.alipay.sofa.rpc.codec.sofahessian.serialize.GenericCustomThrowableDeterminer; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.config.ProviderConfig; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.core.exception.SofaRpcException; +import com.alipay.sofa.rpc.test.ActivelyDestroyTest; +import com.alipay.sofa.rpc.test.generic.bean.Job; +import com.alipay.sofa.rpc.test.generic.bean.People; +import com.caucho.hessian.io.Deserializer; +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.assertEquals; + +/** + * @author xuanbei + * @since 2016/07/28 + */ +public class GenericBenchmarkTest extends ActivelyDestroyTest { + @Test + public void testAll() throws SofaRpcException, InterruptedException { + try { + // 发布服务 + ServerConfig serverConfig2 = new ServerConfig() + .setPort(21234) + .setCoreThreads(500) + .setMaxThreads(500) + .setDaemon(false); + + // C服务的服务端 + ProviderConfig CProvider = new ProviderConfig() + .setInterfaceId(TestInterface.class.getName()) + .setRef(new TestClass()) + .setServer(serverConfig2); + CProvider.export(); + + // 引用服务 + ConsumerConfig BConsumer = new ConsumerConfig() + .setInterfaceId(TestInterface.class.getName()) + .setGeneric(true) + .setDirectUrl("bolt://127.0.0.1:21234") + .setTimeout(3000) + .setRetries(2); + GenericService proxy = BConsumer.refer(); + + setGenericThrowException(false); + clearCacheDeserializerMap(); + benchmark(proxy, 10, 1000); + + setGenericThrowException(true); + clearCacheDeserializerMap(); + benchmark(proxy, 10, 1000); + + setGenericThrowException(false); + clearCacheDeserializerMap(); + long[] disabledData = benchmark(proxy, 10, 1000); + + setGenericThrowException(true); + clearCacheDeserializerMap(); + long[] enabledData = benchmark(proxy, 10, 1000); + + System.out.println("disabledData===>" + Arrays.toString(disabledData)); + System.out.println(" enabledData===>" + Arrays.toString(enabledData)); + Assert.assertEquals(disabledData[0], enabledData[0]); //count + Assert.assertTrue(disabledData[1] - enabledData[1] <= 2); //P50 + Assert.assertTrue(disabledData[2] - enabledData[2] <= 5); //P90 + Assert.assertTrue(disabledData[3] - enabledData[3] <= 10); //P99 + } finally { + setGenericThrowException(false); + clearCacheDeserializerMap(); + } + } + + private long[] benchmark(GenericService proxy, int threadNum, int countNum) throws InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(threadNum); + + List synchronizedList = Collections.synchronizedList(new ArrayList<>(threadNum * countNum / 2)); + + for (int i = 0; i < threadNum; i++) { + new Thread(new Runnable() { + @Override + public void run() { + for (int i = 0; i < countNum; i++) { + long start = System.currentTimeMillis(); + GenericObject genericObject = new GenericObject( + "com.alipay.sofa.rpc.test.generic.bean.People"); + genericObject.putField("name", "Lilei"); + genericObject.putField("job", new Job("coder")); + People people = new People(); + people.setName("Lilei"); + people.setJob(new Job("coder")); + + // sync 调用 + assertEquals(proxy.$invoke("hello", + new String[] {"com.alipay.sofa.rpc.test.generic.bean.People"}, + new Object[] {people}), new TestClass().hello(people)); + + People peopleResult = proxy.$genericInvoke("hello", + new String[] {"com.alipay.sofa.rpc.test.generic.bean.People"}, + new Object[] {genericObject}, People.class); + + assertEquals(peopleResult, new TestClass().hello(people)); + + GenericObject result = (GenericObject) proxy.$genericInvoke("hello", + new String[] {"com.alipay.sofa.rpc.test.generic.bean.People"}, + new Object[] {genericObject}); + isCorrect(result); + + synchronizedList.add(System.currentTimeMillis() - start); + } + countDownLatch.countDown(); + } + }).start(); + } + + countDownLatch.await(); + + Collections.sort(synchronizedList); + int size = synchronizedList.size(); + return new long[] {synchronizedList.size(), synchronizedList.get(size / 2), synchronizedList.get(size * 9 / 10), + synchronizedList.get(size * 99 / 100)}; + } + + private void isCorrect(GenericObject result) { + Assert.assertEquals(result.getType(), "com.alipay.sofa.rpc.test.generic.bean.People"); + Assert.assertEquals(result.getField("name"), "Lilei"); + GenericObject genericObject = (GenericObject) result.getField("job"); + Assert.assertEquals(genericObject.getType(), "com.alipay.sofa.rpc.test.generic.bean.Job"); + Assert.assertEquals(genericObject.getField("name"), "coder"); + } + + public static void setGenericThrowException(boolean enabled) { + try { + Field field = GenericCustomThrowableDeterminer.class.getDeclaredField("GENERIC_THROW_EXCEPTION"); + field.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); + field.set(null, enabled); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void clearCacheDeserializerMap() { + try { + Field field = GenericMultipleClassLoaderSofaSerializerFactory.class.getDeclaredField("DESERIALIZER_MAP"); + field.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); + field.set(null, new ConcurrentHashMap()); + } catch (Exception e) { + e.printStackTrace(); + } + + try { + Field field = GenericSingleClassLoaderSofaSerializerFactory.class.getDeclaredField("DESERIALIZER_MAP"); + field.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); + field.set(null, new ConcurrentHashMap()); + } catch (Exception e) { + e.printStackTrace(); + } + } + +} From e8e19692d857da137da864437459b599bb7aef23 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 3 Jan 2023 19:16:00 +0800 Subject: [PATCH 03/11] chore(deps): bump cxf-core from 3.0.14 to 3.4.10 in /bom (#1285) Bumps cxf-core from 3.0.14 to 3.4.10. --- updated-dependencies: - dependency-name: org.apache.cxf:cxf-core dependency-type: direct:production ... Signed-off-by: dependabot[bot] Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: liujianjun.ljj --- bom/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bom/pom.xml b/bom/pom.xml index 3cf503fef..1d55ce8cf 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -18,7 +18,7 @@ 2.5 3.6.3.Final 1.0.2.Final - 3.0.14 + 3.4.10 7.5.4.v20111024 4.0.1 0.22.0 From c6697d2743463d6c8749a356a032ecccb287cc74 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 3 Jan 2023 19:16:15 +0800 Subject: [PATCH 04/11] chore(deps): bump jackson-databind from 2.9.10.8 to 2.12.7.1 in /bom (#1279) * chore(deps): bump jackson-databind from 2.9.10.8 to 2.12.7.1 in /bom Bumps [jackson-databind](https://github.com/FasterXML/jackson) from 2.9.10.8 to 2.12.7.1. - [Release notes](https://github.com/FasterXML/jackson/releases) - [Commits](https://github.com/FasterXML/jackson/commits) --- updated-dependencies: - dependency-name: com.fasterxml.jackson.core:jackson-databind dependency-type: direct:production ... Signed-off-by: dependabot[bot] * upgrade jackson version * upgrade jackson version and swagger version Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: liujianjun.ljj --- bom/pom.xml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bom/pom.xml b/bom/pom.xml index 1d55ce8cf..48d42a2d9 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -26,15 +26,15 @@ 2.0.3 5.2.0 1.2.2 - 1.5.18 + 1.6.9 7.0 27.0-jre 3.3.13 0.9.2 3.16.1 - 2.9.10 - 2.9.10.8 + 2.12.7 + 2.12.7.1 0.6.12 1.5.9 1.33.1 From a6e234b4530754fabb6d0b7718b347de08c0eaf4 Mon Sep 17 00:00:00 2001 From: jiangyuan <469391363@qq.com> Date: Tue, 3 Jan 2023 19:53:32 +0800 Subject: [PATCH 05/11] support prometheus (#1280) * support prometheus * add prometheus to sofa-rpc-all Co-authored-by: liujianjun.ljj --- all/pom.xml | 6 + bom/pom.xml | 13 + metrics/metrics-prometheus/pom.xml | 107 +++++ .../metrics/prometheus/MetricsBuilder.java | 238 ++++++++++++ .../prometheus/SofaRpcMetricsCollector.java | 367 ++++++++++++++++++ .../SofaRpcMetricsCollectorTest.java | 192 +++++++++ metrics/pom.xml | 1 + 7 files changed, 924 insertions(+) create mode 100644 metrics/metrics-prometheus/pom.xml create mode 100644 metrics/metrics-prometheus/src/main/java/com/alipay/sofa/rpc/metrics/prometheus/MetricsBuilder.java create mode 100644 metrics/metrics-prometheus/src/main/java/com/alipay/sofa/rpc/metrics/prometheus/SofaRpcMetricsCollector.java create mode 100644 metrics/metrics-prometheus/src/test/java/com/alipay/sofa/rpc/metrics/prometheus/SofaRpcMetricsCollectorTest.java diff --git a/all/pom.xml b/all/pom.xml index 639b00fdb..c268f5575 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -282,6 +282,11 @@ sofa-rpc-metrics-micrometer ${project.version} + + com.alipay.sofa + sofa-rpc-metrics-prometheus + ${project.version} + com.alipay.sofa sofa-rpc-doc-swagger @@ -513,6 +518,7 @@ com.alipay.sofa:sofa-rpc-log-common-tools com.alipay.sofa:sofa-rpc-metrics-lookout com.alipay.sofa:sofa-rpc-metrics-micrometer + com.alipay.sofa:sofa-rpc-metrics-prometheus com.alipay.sofa:sofa-rpc-registry-consul com.alipay.sofa:sofa-rpc-registry-local com.alipay.sofa:sofa-rpc-registry-zk diff --git a/bom/pom.xml b/bom/pom.xml index 48d42a2d9..092f0b831 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -29,6 +29,7 @@ 1.6.9 7.0 27.0-jre + 0.16.0 3.3.13 0.9.2 @@ -544,6 +545,18 @@ ${polaris.version} test + + + io.prometheus + simpleclient + ${prometheus.client.version} + + + io.prometheus + simpleclient_httpserver + ${prometheus.client.version} + test + diff --git a/metrics/metrics-prometheus/pom.xml b/metrics/metrics-prometheus/pom.xml new file mode 100644 index 000000000..7725ca5b7 --- /dev/null +++ b/metrics/metrics-prometheus/pom.xml @@ -0,0 +1,107 @@ + + + 4.0.0 + + + sofa-rpc-metrics + com.alipay.sofa + ${revision} + + + sofa-rpc-metrics-prometheus + + + + + com.alipay.sofa + sofa-rpc-api + + + + + io.prometheus + simpleclient + + + + + io.prometheus + simpleclient_httpserver + test + + + + junit + junit + test + + + + com.alipay.sofa + sofa-rpc-log + test + + + + + src/main/java + + + src/main/resources + false + + **/** + + + + src/test/java + + + src/test/resources + false + + **/** + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + ${maven.compiler.source} + ${maven.compiler.target} + ${project.build.sourceEncoding} + + + + org.apache.maven.plugins + maven-install-plugin + + ${module.install.skip} + + + + org.apache.maven.plugins + maven-deploy-plugin + + ${module.deploy.skip} + + + + org.apache.maven.plugins + maven-surefire-plugin + + ${skipTests} + + **/*Test.java + + once + + + + + \ No newline at end of file diff --git a/metrics/metrics-prometheus/src/main/java/com/alipay/sofa/rpc/metrics/prometheus/MetricsBuilder.java b/metrics/metrics-prometheus/src/main/java/com/alipay/sofa/rpc/metrics/prometheus/MetricsBuilder.java new file mode 100644 index 000000000..ae5223e70 --- /dev/null +++ b/metrics/metrics-prometheus/src/main/java/com/alipay/sofa/rpc/metrics/prometheus/MetricsBuilder.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.metrics.prometheus; + +import io.prometheus.client.Counter; +import io.prometheus.client.Gauge; +import io.prometheus.client.Histogram; + +public class MetricsBuilder { + public static final String BYTES = "bytes"; + + public static final String TASKS = "tasks"; + + public static final String THREADS = "threads"; + + private Histogram.Builder clientTotalBuilder = Histogram.build(); + + private Histogram.Builder clientFailBuilder = Histogram.build(); + + private Histogram.Builder serverTotalBuilder = Histogram.build(); + + private Histogram.Builder serverFailBuilder = Histogram.build(); + + private Histogram.Builder requestSizeBuilder = Histogram.build(); + + private Histogram.Builder responseSizeBuilder = Histogram.build(); + + private Counter.Builder providerCounterBuilder = Counter.build(); + + private Counter.Builder consumerCounterBuilder = Counter.build(); + + private Gauge.Builder threadPoolConfigCoreBuilder = Gauge.build(); + + private Gauge.Builder threadPoolConfigMaxBuilder = Gauge.build(); + + private Gauge.Builder threadPoolConfigQueueBuilder = Gauge.build(); + + private Gauge.Builder threadPoolActiveBuilder = Gauge.build(); + + private Gauge.Builder threadPoolIdleBuilder = Gauge.build(); + + private Gauge.Builder threadPoolQueueBuilder = Gauge.build(); + + public Histogram.Builder getClientTotalBuilder() { + return clientTotalBuilder; + } + + public Histogram.Builder getClientFailBuilder() { + return clientFailBuilder; + } + + public Histogram.Builder getServerTotalBuilder() { + return serverTotalBuilder; + } + + public Histogram.Builder getServerFailBuilder() { + return serverFailBuilder; + } + + public Histogram.Builder getRequestSizeBuilder() { + return requestSizeBuilder; + } + + public Histogram.Builder getResponseSizeBuilder() { + return responseSizeBuilder; + } + + public Counter.Builder getProviderCounterBuilder() { + return providerCounterBuilder; + } + + public Counter.Builder getConsumerCounterBuilder() { + return consumerCounterBuilder; + } + + public Gauge.Builder getThreadPoolConfigCoreBuilder() { + return threadPoolConfigCoreBuilder; + } + + public Gauge.Builder getThreadPoolConfigMaxBuilder() { + return threadPoolConfigMaxBuilder; + } + + public Gauge.Builder getThreadPoolConfigQueueBuilder() { + return threadPoolConfigQueueBuilder; + } + + public Gauge.Builder getThreadPoolActiveBuilder() { + return threadPoolActiveBuilder; + } + + public Gauge.Builder getThreadPoolIdleBuilder() { + return threadPoolIdleBuilder; + } + + public Gauge.Builder getThreadPoolQueueBuilder() { + return threadPoolQueueBuilder; + } + + Histogram buildClientTotal(String[] labelNames) { + return clientTotalBuilder + .name("sofa_client_total") + .help("sofa_client_total") + .labelNames(labelNames) + .create(); + } + + Histogram buildClientFail(String[] labelNames) { + return clientFailBuilder + .name("sofa_client_fail") + .help("sofa_client_fail") + .labelNames(labelNames) + .create(); + } + + Histogram buildServerTotal(String[] labelNames) { + return serverTotalBuilder + .name("sofa_server_total") + .help("sofa_server_total") + .labelNames(labelNames) + .create(); + } + + Histogram buildServerFail(String[] labelNames) { + return serverFailBuilder + .name("sofa_server_fail") + .help("sofa_server_fail") + .labelNames(labelNames) + .create(); + } + + Histogram buildRequestSize(String[] labelNames) { + return requestSizeBuilder + .name("sofa_request_size") + .help("sofa_request_size") + .unit(BYTES) + .labelNames(labelNames) + .create(); + } + + Histogram buildResponseSize(String[] labelNames) { + return responseSizeBuilder + .name("sofa_response_size") + .help("sofa_response_size") + .unit(BYTES) + .labelNames(labelNames) + .create(); + } + + Counter buildProviderCounter(String[] labelNames) { + return providerCounterBuilder + .name("sofa_provider") + .help("sofa_provider") + .labelNames(labelNames) + .create(); + } + + Counter buildConsumerCounter(String[] labelNames) { + return consumerCounterBuilder + .name("sofa_consumer") + .help("sofa_consumer") + .labelNames(labelNames) + .create(); + } + + Gauge buildThreadPoolConfigCore(String[] labelNames) { + return threadPoolConfigCoreBuilder + .name("sofa_threadpool_config_core") + .help("sofa_threadpool_config_core") + .unit(THREADS) + .labelNames(labelNames) + .create(); + } + + Gauge buildThreadPoolConfigMax(String[] labelNames) { + return threadPoolConfigMaxBuilder + .name("sofa_threadpool_config_max") + .help("sofa_threadpool_config_max") + .unit(THREADS) + .labelNames(labelNames) + .create(); + } + + Gauge buildThreadPoolConfigQueue(String[] labelNames) { + return threadPoolConfigQueueBuilder + .name("sofa_threadpool_config_queue") + .help("sofa_threadpool_config_queue") + .unit(TASKS) + .labelNames(labelNames) + .create(); + } + + Gauge buildThreadPoolActive(String[] labelNames) { + return threadPoolActiveBuilder + .name("sofa_threadpool_active") + .help("sofa_threadpool_active") + .unit(THREADS) + .labelNames(labelNames) + .create(); + } + + Gauge buildThreadPoolIdle(String[] labelNames) { + return threadPoolIdleBuilder + .name("sofa_threadpool_idle") + .help("sofa_threadpool_idle") + .unit(THREADS) + .labelNames(labelNames) + .create(); + } + + Gauge buildThreadPoolQueue(String[] labelNames) { + return threadPoolQueueBuilder + .name("sofa_threadpool_queue_size") + .help("sofa_threadpool_queue_size") + .unit(TASKS) + .labelNames(labelNames) + .create(); + } + + public static MetricsBuilder defaultOf() { + return new MetricsBuilder(); + } + +} diff --git a/metrics/metrics-prometheus/src/main/java/com/alipay/sofa/rpc/metrics/prometheus/SofaRpcMetricsCollector.java b/metrics/metrics-prometheus/src/main/java/com/alipay/sofa/rpc/metrics/prometheus/SofaRpcMetricsCollector.java new file mode 100644 index 000000000..4c872bbc6 --- /dev/null +++ b/metrics/metrics-prometheus/src/main/java/com/alipay/sofa/rpc/metrics/prometheus/SofaRpcMetricsCollector.java @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.metrics.prometheus; + +import com.alipay.sofa.rpc.common.RemotingConstants; +import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.context.RpcInternalContext; +import com.alipay.sofa.rpc.core.request.SofaRequest; +import com.alipay.sofa.rpc.core.response.SofaResponse; +import com.alipay.sofa.rpc.event.ClientEndInvokeEvent; +import com.alipay.sofa.rpc.event.ConsumerSubEvent; +import com.alipay.sofa.rpc.event.Event; +import com.alipay.sofa.rpc.event.EventBus; +import com.alipay.sofa.rpc.event.ProviderPubEvent; +import com.alipay.sofa.rpc.event.ServerSendEvent; +import com.alipay.sofa.rpc.event.ServerStartedEvent; +import com.alipay.sofa.rpc.event.ServerStoppedEvent; +import com.alipay.sofa.rpc.event.Subscriber; +import io.prometheus.client.Collector; +import io.prometheus.client.Counter; +import io.prometheus.client.Gauge; +import io.prometheus.client.Histogram; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicReference; + +public class SofaRpcMetricsCollector extends Collector implements AutoCloseable { + + private static final String[] INVOKE_LABEL_NAMES = new String[]{"app", "service", "method", "protocol", "invoke_type", "caller_app"}; + + private String[] commonLabelNames; + private String[] commonLabelValues; + + private PrometheusSubscriber subscriber; + + private Histogram clientTotal; + + private Histogram clientFail; + + private Histogram serverTotal; + + private Histogram serverFail; + + private Histogram requestSize; + + private Histogram responseSize; + + private Counter providerCounter; + + private Counter consumerCounter; + + private Gauge threadPoolConfigCore; + + private Gauge threadPoolConfigMax; + + private Gauge threadPoolConfigQueue; + + private Gauge threadPoolActive; + + private Gauge threadPoolIdle; + + private Gauge threadPoolQueue; + + + private final AtomicReference serverConfigReference = new AtomicReference<>(); + private final AtomicReference executorReference = new AtomicReference<>(); + + public SofaRpcMetricsCollector() { + this(Collections.emptyMap(), MetricsBuilder.defaultOf()); + } + + public SofaRpcMetricsCollector(Map commonLabels) { + this(commonLabels, MetricsBuilder.defaultOf()); + } + + public SofaRpcMetricsCollector(MetricsBuilder metricsBuilder) { + this(Collections.emptyMap(), metricsBuilder); + } + + public SofaRpcMetricsCollector(Map commonLabels, MetricsBuilder metricsBuilder) { + this.commonLabelNames = commonLabels.keySet().toArray(new String[0]); + this.commonLabelValues = commonLabels.values().toArray(new String[0]); + this.subscriber = new PrometheusSubscriber(); + + String[] labelNames; + int clength = commonLabelNames.length; + if (clength == 0) { + labelNames = INVOKE_LABEL_NAMES; + } else { + int ilength = INVOKE_LABEL_NAMES.length; + labelNames = new String[clength + ilength]; + System.arraycopy(commonLabelNames, 0, labelNames, 0, clength); + System.arraycopy(INVOKE_LABEL_NAMES, 0, labelNames, clength, ilength); + } + + this.clientTotal = metricsBuilder.buildClientTotal(labelNames); + this.clientFail = metricsBuilder.buildClientFail(labelNames); + this.serverTotal = metricsBuilder.buildServerTotal(labelNames); + this.serverFail = metricsBuilder.buildServerFail(labelNames); + this.requestSize = metricsBuilder.buildRequestSize(labelNames); + this.responseSize = metricsBuilder.buildResponseSize(labelNames); + + this.providerCounter = metricsBuilder.buildProviderCounter(commonLabelNames); + this.consumerCounter = metricsBuilder.buildConsumerCounter(commonLabelNames); + this.threadPoolConfigCore = metricsBuilder.buildThreadPoolConfigCore(commonLabelNames); + this.threadPoolConfigMax = metricsBuilder.buildThreadPoolConfigMax(commonLabelNames); + this.threadPoolConfigQueue = metricsBuilder.buildThreadPoolConfigQueue(commonLabelNames); + this.threadPoolActive = metricsBuilder.buildThreadPoolActive(commonLabelNames); + this.threadPoolIdle = metricsBuilder.buildThreadPoolIdle(commonLabelNames); + this.threadPoolQueue = metricsBuilder.buildThreadPoolQueue(commonLabelNames); + + registerSubscriber(); + } + + private void registerSubscriber() { + EventBus.register(ClientEndInvokeEvent.class, subscriber); + EventBus.register(ServerSendEvent.class, subscriber); + EventBus.register(ServerStartedEvent.class, subscriber); + EventBus.register(ServerStoppedEvent.class, subscriber); + EventBus.register(ProviderPubEvent.class, subscriber); + EventBus.register(ConsumerSubEvent.class, subscriber); + } + + @Override + public List collect() { + List result = new ArrayList<>(); + result.addAll(clientTotal.collect()); + result.addAll(clientFail.collect()); + result.addAll(serverTotal.collect()); + result.addAll(serverFail.collect()); + result.addAll(requestSize.collect()); + result.addAll(responseSize.collect()); + result.addAll(providerCounter.collect()); + result.addAll(consumerCounter.collect()); + + ServerConfig serverConfig = serverConfigReference.get(); + ThreadPoolExecutor threadPoolExecutor = executorReference.get(); + if (serverConfig != null) { + threadPoolConfigCore.labels(commonLabelValues) + .set(serverConfig.getCoreThreads()); + result.addAll(threadPoolConfigCore.collect()); + + + threadPoolConfigMax.labels(commonLabelValues) + .set(serverConfig.getMaxThreads()); + result.addAll(threadPoolConfigMax.collect()); + + + threadPoolConfigQueue.labels(commonLabelValues) + .set(serverConfig.getQueues()); + result.addAll(threadPoolConfigQueue.collect()); + } + + + if (threadPoolExecutor != null) { + threadPoolActive.labels(commonLabelValues) + .set(threadPoolExecutor.getActiveCount()); + result.addAll(threadPoolActive.collect()); + + threadPoolIdle.labels(commonLabelValues) + .set(threadPoolExecutor.getPoolSize() - threadPoolExecutor.getActiveCount()); + result.addAll(threadPoolIdle.collect()); + + + threadPoolQueue.labels(commonLabelValues) + .set(threadPoolExecutor.getQueue().size()); + result.addAll(threadPoolQueue.collect()); + } + + return result; + } + + @Override + public void close() throws Exception { + EventBus.unRegister(ClientEndInvokeEvent.class, subscriber); + EventBus.unRegister(ServerSendEvent.class, subscriber); + EventBus.unRegister(ServerStartedEvent.class, subscriber); + EventBus.unRegister(ServerStoppedEvent.class, subscriber); + EventBus.unRegister(ProviderPubEvent.class, subscriber); + EventBus.unRegister(ConsumerSubEvent.class, subscriber); + } + + private static Long getLongAvoidNull(Object object) { + if (object == null) { + return null; + } + + if (object instanceof Integer) { + return Long.parseLong(object.toString()); + } + + return (Long) object; + } + + private static String getStringAvoidNull(Object object) { + if (object == null) { + return null; + } + + return (String) object; + } + + + private class PrometheusSubscriber extends Subscriber { + + @Override + public void onEvent(Event event) { + if (event instanceof ClientEndInvokeEvent) { + onEvent((ClientEndInvokeEvent) event); + } else if (event instanceof ServerSendEvent) { + onEvent((ServerSendEvent) event); + } else if (event instanceof ServerStartedEvent) { + onEvent((ServerStartedEvent) event); + } else if (event instanceof ServerStoppedEvent) { + onEvent((ServerStoppedEvent) event); + } else if (event instanceof ProviderPubEvent) { + onEvent((ProviderPubEvent) event); + } else if (event instanceof ConsumerSubEvent) { + onEvent((ConsumerSubEvent) event); + } else { + throw new IllegalArgumentException("unexpected event: " + event); + } + } + + private void onEvent(ClientEndInvokeEvent event) { + InvokeMeta meta = new InvokeMeta( + event.getRequest(), + event.getResponse(), + getLongAvoidNull(RpcInternalContext.getContext().getAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE)) + ); + long elapsed = meta.elapsed(); + String[] labelValues = meta.labelValues(commonLabelValues); + + clientTotal.labels(labelValues).observe(elapsed); + if (!meta.success()) { + clientFail.labels(labelValues).observe(elapsed); + } + + RpcInternalContext context = RpcInternalContext.getContext(); + requestSize.labels(labelValues) + .observe(getLongAvoidNull(context.getAttachment(RpcConstants.INTERNAL_KEY_REQ_SIZE))); + responseSize.labels(labelValues) + .observe(getLongAvoidNull(context.getAttachment(RpcConstants.INTERNAL_KEY_RESP_SIZE))); + } + + private void onEvent(ServerSendEvent event) { + InvokeMeta meta = new InvokeMeta( + event.getRequest(), + event.getResponse(), + getLongAvoidNull(RpcInternalContext.getContext().getAttachment(RpcConstants.INTERNAL_KEY_IMPL_ELAPSE)) + ); + long elapsed = meta.elapsed(); + String[] labelValues = meta.labelValues(commonLabelValues); + + serverTotal.labels(labelValues).observe(elapsed); + if (!meta.success()) { + serverFail.labels(labelValues).observe(elapsed); + } + } + + private void onEvent(ServerStartedEvent event) { + serverConfigReference.set(event.getServerConfig()); + executorReference.set(event.getThreadPoolExecutor()); + } + + private void onEvent(ServerStoppedEvent event) { + serverConfigReference.set(null); + executorReference.set(null); + } + + private void onEvent(ProviderPubEvent event) { + providerCounter.labels(commonLabelValues) + .inc(); + } + + private void onEvent(ConsumerSubEvent event) { + consumerCounter.labels(commonLabelValues) + .inc(); + } + } + + private static class InvokeMeta { + + private final SofaRequest request; + private final SofaResponse response; + private final long elapsed; + + private InvokeMeta(SofaRequest request, SofaResponse response, long elapsed) { + this.request = request; + this.response = response; + this.elapsed = elapsed; + } + + public String app() { + return Optional.ofNullable(request.getTargetAppName()).orElse(""); + } + + public String callerApp() { + return Optional.ofNullable(getStringAvoidNull( + request.getRequestProp(RemotingConstants.HEAD_APP_NAME))).orElse(""); + } + + public String service() { + return Optional.ofNullable(request.getTargetServiceUniqueName()).orElse(""); + } + + public String method() { + return Optional.ofNullable(request.getMethodName()).orElse(""); + } + + public String protocol() { + return Optional.ofNullable(getStringAvoidNull( + request.getRequestProp(RemotingConstants.HEAD_PROTOCOL))).orElse(""); + } + + public String invokeType() { + return Optional.ofNullable(request.getInvokeType()).orElse(""); + } + + public long elapsed() { + return elapsed; + } + + public boolean success() { + return response != null + && !response.isError() + && response.getErrorMsg() == null + && (!(response.getAppResponse() instanceof Throwable)); + } + + public String[] labelValues(String[] commonLabelValues) { + String[] labelValues; + String[] invokeLabelValues = new String[]{app(), service(), method(), protocol(), invokeType(), callerApp()}; + int clength = commonLabelValues.length; + if (clength == 0) { + labelValues = invokeLabelValues; + } else { + int ilength = invokeLabelValues.length; + labelValues = new String[clength + ilength]; + System.arraycopy(commonLabelValues, 0, labelValues, 0, clength); + System.arraycopy(invokeLabelValues, 0, labelValues, clength, ilength); + } + return labelValues; + } + } + +} diff --git a/metrics/metrics-prometheus/src/test/java/com/alipay/sofa/rpc/metrics/prometheus/SofaRpcMetricsCollectorTest.java b/metrics/metrics-prometheus/src/test/java/com/alipay/sofa/rpc/metrics/prometheus/SofaRpcMetricsCollectorTest.java new file mode 100644 index 000000000..5ee0fc0e4 --- /dev/null +++ b/metrics/metrics-prometheus/src/test/java/com/alipay/sofa/rpc/metrics/prometheus/SofaRpcMetricsCollectorTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.metrics.prometheus; + +import com.alipay.sofa.rpc.common.RemotingConstants; +import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.config.ProviderConfig; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.context.RpcInternalContext; +import com.alipay.sofa.rpc.core.exception.SofaRpcException; +import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback; +import com.alipay.sofa.rpc.core.request.RequestBase; +import com.alipay.sofa.rpc.core.request.SofaRequest; +import com.alipay.sofa.rpc.core.response.SofaResponse; +import com.alipay.sofa.rpc.event.ClientEndInvokeEvent; +import com.alipay.sofa.rpc.event.ConsumerSubEvent; +import com.alipay.sofa.rpc.event.EventBus; +import com.alipay.sofa.rpc.event.ProviderPubEvent; +import com.alipay.sofa.rpc.event.ServerSendEvent; +import com.alipay.sofa.rpc.event.ServerStartedEvent; +import com.alipay.sofa.rpc.event.ServerStoppedEvent; +import io.prometheus.client.Collector; +import io.prometheus.client.CollectorRegistry; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class SofaRpcMetricsCollectorTest { + + @Test + public void testPrometheusMetricsCollect1() throws Exception { + try (SofaRpcMetricsCollector collector = new SofaRpcMetricsCollector()) { + CollectorRegistry registry = new CollectorRegistry(); + collector.register(registry); + + SofaRequest request = buildRequest(); + SofaResponse successResponse = buildSuccessResponse(); + SofaResponse failResponse = buildFailResponse(); + RpcInternalContext.getContext() + .setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE, 100) + .setAttachment(RpcConstants.INTERNAL_KEY_IMPL_ELAPSE, 10) + .setAttachment(RpcConstants.INTERNAL_KEY_REQ_SIZE, 3) + .setAttachment(RpcConstants.INTERNAL_KEY_RESP_SIZE, 4); + + List samplesList; + + EventBus.post(new ClientEndInvokeEvent(request, successResponse, null)); + EventBus.post(new ClientEndInvokeEvent(request, failResponse, null)); + + EventBus.post(new ServerSendEvent(request, successResponse, null)); + EventBus.post(new ServerSendEvent(request, failResponse, null)); + + EventBus.post(new ProviderPubEvent(new ProviderConfig<>())); + + EventBus.post(new ConsumerSubEvent(new ConsumerConfig<>())); + samplesList = collector.collect(); + Assert.assertEquals(samplesList.size(), 8); + + ServerConfig serverConfig = new ServerConfig(); + EventBus.post(new ServerStartedEvent(serverConfig, new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()))); + samplesList = collector.collect(); + Assert.assertEquals(samplesList.size(), 14); + + EventBus.post(new ServerStoppedEvent(serverConfig)); + samplesList = collector.collect(); + Assert.assertEquals(samplesList.size(), 8); + +// new HTTPServer(new InetSocketAddress(9000),registry); +// Thread.currentThread().join(); + } + } + + @Test + public void testPrometheusMetricsCollect2() throws Exception { + MetricsBuilder metricsBuilder = new MetricsBuilder(); + // set buckets + metricsBuilder.getClientTotalBuilder() + .exponentialBuckets(1, 2, 15); + metricsBuilder.getClientFailBuilder() + .linearBuckets(0, 5, 15); + + Map testLabels = new HashMap<>(); + testLabels.put("from", "test"); + + try (SofaRpcMetricsCollector collector = new SofaRpcMetricsCollector(testLabels, metricsBuilder)) { + CollectorRegistry registry = new CollectorRegistry(); + collector.register(registry); + + SofaRequest request = buildRequest(); + SofaResponse successResponse = buildSuccessResponse(); + SofaResponse failResponse = buildFailResponse(); + RpcInternalContext.getContext() + .setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE, 100) + .setAttachment(RpcConstants.INTERNAL_KEY_IMPL_ELAPSE, 10) + .setAttachment(RpcConstants.INTERNAL_KEY_REQ_SIZE, 3) + .setAttachment(RpcConstants.INTERNAL_KEY_RESP_SIZE, 4); + + List samplesList; + + EventBus.post(new ClientEndInvokeEvent(request, successResponse, null)); + EventBus.post(new ClientEndInvokeEvent(request, failResponse, null)); + + EventBus.post(new ProviderPubEvent(new ProviderConfig<>())); + + EventBus.post(new ConsumerSubEvent(new ConsumerConfig<>())); + + ServerConfig serverConfig = new ServerConfig(); + EventBus.post(new ServerStartedEvent(serverConfig, new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()))); + + samplesList = collector.collect(); + Assert.assertEquals(samplesList.size(), 14); + +// new HTTPServer(new InetSocketAddress(9000),registry); +// Thread.currentThread().join(); + } + } + + private SofaRequest buildRequest() throws NoSuchMethodException { + SofaRequest request = new SofaRequest(); + request.setInterfaceName(TestService.class.getName()); + request.setMethodName("echoStr"); + request.setMethod(TestService.class.getMethod("func")); + request.setMethodArgs(new Object[] {}); + request.setMethodArgSigs(new String[] {}); + request.setTargetServiceUniqueName(TestService.class.getName() + ":1.0"); + request.setTargetAppName("targetApp"); + request.setSerializeType((byte) 11); + request.setTimeout(1024); + request.setInvokeType(RpcConstants.INVOKER_TYPE_SYNC); + request.addRequestProp(RemotingConstants.HEAD_APP_NAME, "app"); + request.addRequestProp(RemotingConstants.HEAD_PROTOCOL, "bolt"); + request.setSofaResponseCallback(new SofaResponseCallback() { + @Override + public void onAppResponse(Object appResponse, String methodName, RequestBase request) { + + } + + @Override + public void onAppException(Throwable throwable, String methodName, RequestBase request) { + + } + + @Override + public void onSofaException(SofaRpcException sofaException, String methodName, RequestBase request) { + + } + }); + return request; + } + + private SofaResponse buildSuccessResponse() { + SofaResponse response = new SofaResponse(); + response.setAppResponse("123"); + return response; + } + + private SofaResponse buildFailResponse() { + SofaResponse response = new SofaResponse(); + response.setAppResponse(new RuntimeException()); + return response; + } + + private static class TestService { + + public String func() { + return null; + } + } +} diff --git a/metrics/pom.xml b/metrics/pom.xml index e4a753544..e50d83a16 100644 --- a/metrics/pom.xml +++ b/metrics/pom.xml @@ -16,6 +16,7 @@ metrics-lookout metrics-micrometer + metrics-prometheus From ef1a488dfeea9cb9822e91b3bd4c0bd3909af583 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 3 Jan 2023 20:49:37 +0800 Subject: [PATCH 06/11] chore(deps): bump protobuf-java from 3.16.1 to 3.16.3 in /bom (#1252) * chore(deps): bump protobuf-java from 3.16.1 to 3.16.3 in /bom Bumps [protobuf-java](https://github.com/protocolbuffers/protobuf) from 3.16.1 to 3.16.3. - [Release notes](https://github.com/protocolbuffers/protobuf/releases) - [Changelog](https://github.com/protocolbuffers/protobuf/blob/main/generate_changelog.py) - [Commits](https://github.com/protocolbuffers/protobuf/compare/v3.16.1...v3.16.3) --- updated-dependencies: - dependency-name: com.google.protobuf:protobuf-java dependency-type: direct:production ... Signed-off-by: dependabot[bot] * chore(deps): bump protobuf-java from 3.16.1 to 3.16.3 in /bom Bumps [protobuf-java](https://github.com/protocolbuffers/protobuf) from 3.16.1 to 3.16.3. - [Release notes](https://github.com/protocolbuffers/protobuf/releases) - [Changelog](https://github.com/protocolbuffers/protobuf/blob/main/generate_changelog.py) - [Commits](https://github.com/protocolbuffers/protobuf/compare/v3.16.1...v3.16.3) --- updated-dependencies: - dependency-name: com.google.protobuf:protobuf-java dependency-type: direct:production ... Signed-off-by: dependabot[bot] Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: liujianjun.ljj --- bom/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bom/pom.xml b/bom/pom.xml index 092f0b831..36922541f 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -33,7 +33,7 @@ 3.3.13 0.9.2 - 3.16.1 + 3.16.3 2.12.7 2.12.7.1 0.6.12 From 7809a25af98a6e63794b16fe063311dd64b8101b Mon Sep 17 00:00:00 2001 From: evenliu Date: Tue, 3 Jan 2023 20:59:34 +0800 Subject: [PATCH 07/11] Fix the problem of triple header losing information (#1289) * fix triple client and server log uniqueName without version info * fix triple client and server log uniqueName without version info * fix triple client and server log uniqueName without version info Co-authored-by: liujianjun.ljj --- .../sofatracer/TripleTracerAdapter.java | 5 ++ .../sofatracer/TripleTracerAdapterTest.java | 56 +++++++++++++++++++ 2 files changed, 61 insertions(+) create mode 100644 remoting/remoting-triple/src/test/java/com/alipay/sofa/rpc/tracer/sofatracer/TripleTracerAdapterTest.java diff --git a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/tracer/sofatracer/TripleTracerAdapter.java b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/tracer/sofatracer/TripleTracerAdapter.java index ff40585dd..e17af6406 100644 --- a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/tracer/sofatracer/TripleTracerAdapter.java +++ b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/tracer/sofatracer/TripleTracerAdapter.java @@ -24,6 +24,7 @@ import com.alipay.sofa.rpc.common.RemotingConstants; import com.alipay.sofa.rpc.common.RpcConstants; import com.alipay.sofa.rpc.common.TracerCompatibleConstants; +import com.alipay.sofa.rpc.common.utils.CodecUtils; import com.alipay.sofa.rpc.common.utils.JSONUtils; import com.alipay.sofa.rpc.common.utils.StringUtils; import com.alipay.sofa.rpc.config.ConfigUniqueNameGenerator; @@ -81,6 +82,10 @@ public static void beforeSend(SofaRequest sofaRequest, ConsumerConfig consumerCo header.put(RemotingConstants.HEAD_METHOD_NAME, sofaRequest.getMethodName()); header.put(RemotingConstants.HEAD_TARGET_SERVICE, sofaRequest.getTargetServiceUniqueName()); header.put(RemotingConstants.HEAD_TARGET_APP, sofaRequest.getTargetAppName()); + Map requestProps = sofaRequest.getRequestProps(); + if (requestProps != null) { + CodecUtils.flatCopyTo("", requestProps, header); + } //客户端的启动 SofaTraceContext sofaTraceContext = SofaTraceContextHolder.getSofaTraceContext(); //获取并不弹出 diff --git a/remoting/remoting-triple/src/test/java/com/alipay/sofa/rpc/tracer/sofatracer/TripleTracerAdapterTest.java b/remoting/remoting-triple/src/test/java/com/alipay/sofa/rpc/tracer/sofatracer/TripleTracerAdapterTest.java new file mode 100644 index 000000000..1ea78e9b3 --- /dev/null +++ b/remoting/remoting-triple/src/test/java/com/alipay/sofa/rpc/tracer/sofatracer/TripleTracerAdapterTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.tracer.sofatracer; + +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.core.request.SofaRequest; +import com.alipay.sofa.rpc.server.triple.TripleHeadKeys; +import io.grpc.Metadata; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static com.alipay.sofa.rpc.common.RemotingConstants.HEAD_TARGET_SERVICE; + +/** + * @author Even + * @date 2022/12/29 1:53 PM + */ +public class TripleTracerAdapterTest { + + @Test + public void testBeforeSend() { + SofaRequest sofaRequest = new SofaRequest(); + sofaRequest.setTargetServiceUniqueName("targetService1"); + sofaRequest.addRequestProp("triple.header.key", "triple.header.value"); + Map map = new HashMap(); + map.put("key1", "value1"); + map.put("key2", "value2"); + sofaRequest.addRequestProp("triple.header.object", map); + sofaRequest.addRequestProp(HEAD_TARGET_SERVICE, "targetService2"); + ConsumerConfig consumerConfig = new ConsumerConfig(); + Metadata metadata = new Metadata(); + TripleTracerAdapter.beforeSend(sofaRequest, consumerConfig, metadata); + Assert.assertEquals("targetService2", metadata.get(TripleHeadKeys.getKey(HEAD_TARGET_SERVICE))); + Assert.assertEquals("triple.header.value", metadata.get(TripleHeadKeys.getKey("triple.header.key"))); + Assert.assertEquals("value1", metadata.get(TripleHeadKeys.getKey("triple.header.object.key1"))); + Assert.assertEquals("value2", metadata.get(TripleHeadKeys.getKey("triple.header.object.key2"))); + } + +} \ No newline at end of file From e203f2be6fdefc6cf0ee59e0889bf0e26d6097a1 Mon Sep 17 00:00:00 2001 From: Marco Date: Thu, 5 Jan 2023 10:41:20 +0800 Subject: [PATCH 08/11] triple async call support trace log (#1282) * triple async call support trace log * review update * review update Co-authored-by: liujianjun.ljj --- .../transport/triple/TripleClientInvoker.java | 186 ++++++++++++++---- .../triple/TripleClientTransport.java | 16 +- 2 files changed, 152 insertions(+), 50 deletions(-) diff --git a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/transport/triple/TripleClientInvoker.java b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/transport/triple/TripleClientInvoker.java index 4e6de65e6..7079f3f13 100644 --- a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/transport/triple/TripleClientInvoker.java +++ b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/transport/triple/TripleClientInvoker.java @@ -16,15 +16,26 @@ */ package com.alipay.sofa.rpc.transport.triple; +import com.alipay.sofa.rpc.client.ProviderInfo; import com.alipay.sofa.rpc.codec.Serializer; import com.alipay.sofa.rpc.codec.SerializerFactory; +import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.common.utils.ClassLoaderUtils; import com.alipay.sofa.rpc.common.utils.StringUtils; import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.context.BaggageResolver; +import com.alipay.sofa.rpc.context.RpcInternalContext; +import com.alipay.sofa.rpc.context.RpcInvokeContext; +import com.alipay.sofa.rpc.context.RpcRuntimeContext; import com.alipay.sofa.rpc.core.exception.RpcErrorType; import com.alipay.sofa.rpc.core.exception.SofaRpcException; import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback; import com.alipay.sofa.rpc.core.request.SofaRequest; import com.alipay.sofa.rpc.core.response.SofaResponse; +import com.alipay.sofa.rpc.event.ClientAsyncReceiveEvent; +import com.alipay.sofa.rpc.event.ClientEndInvokeEvent; +import com.alipay.sofa.rpc.event.EventBus; +import com.alipay.sofa.rpc.filter.FilterChain; import com.alipay.sofa.rpc.log.Logger; import com.alipay.sofa.rpc.log.LoggerFactory; import com.alipay.sofa.rpc.message.ResponseFuture; @@ -66,6 +77,8 @@ public class TripleClientInvoker implements TripleInvoker { protected ConsumerConfig consumerConfig; + protected ProviderInfo providerInfo; + protected Method sofaStub; protected boolean useGeneric; @@ -75,9 +88,11 @@ public class TripleClientInvoker implements TripleInvoker { private Map methodMap = new ConcurrentHashMap<>(); - public TripleClientInvoker(ConsumerConfig consumerConfig, Channel channel) { + public TripleClientInvoker(ConsumerConfig consumerConfig, ProviderInfo providerInfo, Channel channel) { this.channel = channel; this.consumerConfig = consumerConfig; + this.providerInfo = providerInfo; + useGeneric = checkIfUseGeneric(consumerConfig); cacheCommonData(consumerConfig); @@ -142,6 +157,9 @@ public ResponseFuture asyncInvoke(SofaRequest sofaRequest, int timeout) throws E SofaResponseCallback sofaResponseCallback = sofaRequest.getSofaResponseCallback(); TripleResponseFuture future = new TripleResponseFuture(sofaRequest, timeout); + ClassLoader currentClassLoader = ClassLoaderUtils.getCurrentClassLoader(); + RpcInternalContext context = RpcInternalContext.getContext(); + if (!useGeneric) { Method m = methodMap.get(sofaRequest.getMethodName()); if (m == null) { @@ -166,25 +184,12 @@ public ResponseFuture asyncInvoke(SofaRequest sofaRequest, int timeout) throws E m.invoke(stub, sofaRequest.getMethodArgs()[0], new StreamObserver() { @Override public void onNext(Object o) { - if (sofaResponseCallback != null) { - sofaResponseCallback.onAppResponse(o, sofaRequest.getMethodName(), sofaRequest); - } else { - future.setSuccess(o); - } + processSuccess(false, context, sofaRequest, o, sofaResponseCallback, future, currentClassLoader); } @Override public void onError(Throwable throwable) { - if (sofaResponseCallback != null) { - Status status = Status.fromThrowable(throwable); - if (status.getCode() == Status.Code.UNKNOWN) { - sofaResponseCallback.onAppException(throwable, sofaRequest.getMethodName(), sofaRequest); - }else { - sofaResponseCallback.onSofaException(new SofaRpcException(RpcErrorType.UNKNOWN, status.getCause()), sofaRequest.getMethodName(), sofaRequest); - } - } else { - future.setFailure(throwable); - } + processError(context, sofaRequest, throwable, sofaResponseCallback, future, currentClassLoader); } @Override @@ -198,35 +203,12 @@ public void onCompleted() { ClientCalls.asyncUnaryCall(channel.newCall(methodDescriptor, buildCustomCallOptions(sofaRequest, timeout)), request, new StreamObserver() { @Override public void onNext(Object o) { - Object appResponse = null; - Response response = (Response) o; - byte[] responseDate = response.getData().toByteArray(); - Class returnType = sofaRequest.getMethod().getReturnType(); - if (returnType != void.class) { - if (responseDate != null && responseDate.length > 0) { - Serializer responseSerializer = SerializerFactory.getSerializer(response.getSerializeType()); - appResponse = responseSerializer.decode(new ByteArrayWrapperByteBuf(responseDate), returnType, null); - } - } - if (sofaResponseCallback != null) { - sofaResponseCallback.onAppResponse(appResponse, sofaRequest.getMethodName(), sofaRequest); - } else { - future.setSuccess(appResponse); - } + processSuccess(true, context, sofaRequest, o, sofaResponseCallback, future, currentClassLoader); } @Override public void onError(Throwable throwable) { - if (sofaResponseCallback != null) { - Status status = Status.fromThrowable(throwable); - if (status.getCode() == Status.Code.UNKNOWN) { - sofaResponseCallback.onAppException(throwable, sofaRequest.getMethodName(), sofaRequest); - }else { - sofaResponseCallback.onSofaException(new SofaRpcException(RpcErrorType.UNKNOWN, status.getCause()), sofaRequest.getMethodName(), sofaRequest); - } - } else { - future.setFailure(throwable); - } + processError(context, sofaRequest, throwable, sofaResponseCallback, future, currentClassLoader); } @Override @@ -238,6 +220,128 @@ public void onCompleted() { return future; } + private void processSuccess(boolean needDecode, RpcInternalContext context, SofaRequest sofaRequest, Object o, SofaResponseCallback sofaResponseCallback, TripleResponseFuture future, ClassLoader classLoader) { + ClassLoader oldCl = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(classLoader); + RpcInternalContext.setContext(context); + + SofaResponse sofaResponse = new SofaResponse(); + sofaResponse.setAppResponse(o); + if (EventBus.isEnable(ClientAsyncReceiveEvent.class)) { + EventBus.post(new ClientAsyncReceiveEvent(consumerConfig, providerInfo, + sofaRequest, sofaResponse, null)); + } + + pickupBaggage(context, sofaResponse); + + // do async filter after respond server + FilterChain chain = consumerConfig.getConsumerBootstrap().getCluster().getFilterChain(); + if (chain != null) { + chain.onAsyncResponse(consumerConfig, sofaRequest, sofaResponse, null); + } + + recordClientElapseTime(context); + + if (EventBus.isEnable(ClientEndInvokeEvent.class)) { + EventBus.post(new ClientEndInvokeEvent(sofaRequest, sofaResponse, null)); + } + + Object appResponse = o; + if (needDecode) { + Response response = (Response) o; + byte[] responseDate = response.getData().toByteArray(); + Class returnType = sofaRequest.getMethod().getReturnType(); + if (returnType != void.class) { + if (responseDate != null && responseDate.length > 0) { + Serializer responseSerializer = SerializerFactory.getSerializer(response.getSerializeType()); + appResponse = responseSerializer.decode(new ByteArrayWrapperByteBuf(responseDate), returnType, null); + } + } + } + + if (sofaResponseCallback != null) { + sofaResponseCallback.onAppResponse(appResponse, sofaRequest.getMethodName(), sofaRequest); + } else { + future.setSuccess(appResponse); + } + } finally { + Thread.currentThread().setContextClassLoader(oldCl); + RpcInvokeContext.removeContext(); + RpcInternalContext.removeAllContext(); + } + } + + private void processError(RpcInternalContext context, SofaRequest sofaRequest, Throwable throwable, SofaResponseCallback sofaResponseCallback, TripleResponseFuture future, ClassLoader classLoader) { + ClassLoader oldCl = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(classLoader); + RpcInternalContext.setContext(context); + + if (EventBus.isEnable(ClientAsyncReceiveEvent.class)) { + EventBus.post(new ClientAsyncReceiveEvent(consumerConfig, providerInfo, + sofaRequest, null, throwable)); + } + + // do async filter after respond server + FilterChain chain = consumerConfig.getConsumerBootstrap().getCluster().getFilterChain(); + if (chain != null) { + chain.onAsyncResponse(consumerConfig, sofaRequest, null, throwable); + } + + recordClientElapseTime(context); + + if (EventBus.isEnable(ClientEndInvokeEvent.class)) { + EventBus.post(new ClientEndInvokeEvent(sofaRequest, null, throwable)); + } + + if (sofaResponseCallback != null) { + Status status = Status.fromThrowable(throwable); + if (status.getCode() == Status.Code.UNKNOWN) { + sofaResponseCallback.onAppException(throwable, sofaRequest.getMethodName(), sofaRequest); + }else { + sofaResponseCallback.onSofaException(new SofaRpcException(RpcErrorType.UNKNOWN, status.getCause()), sofaRequest.getMethodName(), sofaRequest); + } + } else { + future.setFailure(throwable); + } + } finally { + Thread.currentThread().setContextClassLoader(oldCl); + RpcInvokeContext.removeContext(); + RpcInternalContext.removeAllContext(); + } + } + + protected void recordClientElapseTime(RpcInternalContext context) { + if (context != null) { + Long startTime = (Long) context.removeAttachment(RpcConstants.INTERNAL_KEY_CLIENT_SEND_TIME); + if (startTime != null) { + context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE, RpcRuntimeContext.now() - startTime); + } + } + } + + protected void pickupBaggage(RpcInternalContext context, SofaResponse response) { + if (RpcInvokeContext.isBaggageEnable()) { + RpcInvokeContext old = null; + RpcInvokeContext newContext = null; + if (context != null) { + old = (RpcInvokeContext) context.getAttachment(RpcConstants.HIDDEN_KEY_INVOKE_CONTEXT); + } + if (old != null) { + RpcInvokeContext.setContext(old); + } + newContext = RpcInvokeContext.getContext(); + BaggageResolver.pickupFromResponse(newContext, response); + + if (old != null) { + old.getAllResponseBaggage().putAll(newContext.getAllResponseBaggage()); + old.getAllRequestBaggage().putAll(newContext.getAllRequestBaggage()); + } + + } + } + private MethodDescriptor getMethodDescriptor(SofaRequest sofaRequest) { String serviceName = sofaRequest.getInterfaceName(); String methodName = sofaRequest.getMethodName(); diff --git a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/transport/triple/TripleClientTransport.java b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/transport/triple/TripleClientTransport.java index 8327eab59..3d696d624 100644 --- a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/transport/triple/TripleClientTransport.java +++ b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/transport/triple/TripleClientTransport.java @@ -17,16 +17,15 @@ package com.alipay.sofa.rpc.transport.triple; import com.alipay.sofa.rpc.client.ProviderInfo; -import com.alipay.sofa.rpc.common.utils.ClassLoaderUtils; import com.alipay.sofa.rpc.common.utils.NetUtils; import com.alipay.sofa.rpc.context.RpcInternalContext; import com.alipay.sofa.rpc.context.RpcInvokeContext; import com.alipay.sofa.rpc.core.exception.RpcErrorType; import com.alipay.sofa.rpc.core.exception.SofaRpcException; import com.alipay.sofa.rpc.core.exception.SofaTimeOutException; -import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback; import com.alipay.sofa.rpc.core.request.SofaRequest; import com.alipay.sofa.rpc.core.response.SofaResponse; +import com.alipay.sofa.rpc.event.ClientAfterSendEvent; import com.alipay.sofa.rpc.event.ClientBeforeSendEvent; import com.alipay.sofa.rpc.event.ClientSyncReceiveEvent; import com.alipay.sofa.rpc.event.EventBus; @@ -101,7 +100,7 @@ public void connect() { } protected TripleClientInvoker buildClientInvoker() { - return new TripleClientInvoker(transportConfig.getConsumerConfig(), channel); + return new TripleClientInvoker(transportConfig.getConsumerConfig(), providerInfo, channel); } @Override @@ -161,7 +160,6 @@ public int currentRequests() { @Override public ResponseFuture asyncSend(SofaRequest request, int timeout) throws SofaRpcException { - SofaResponse sofaResponse = null; SofaRpcException throwable = null; try { @@ -178,9 +176,8 @@ public ResponseFuture asyncSend(SofaRequest request, int timeout) throws SofaRpc throwable = convertToRpcException(e); throw throwable; } finally { - if (EventBus.isEnable(ClientSyncReceiveEvent.class)) { - EventBus.post(new ClientSyncReceiveEvent(transportConfig.getConsumerConfig(), - transportConfig.getProviderInfo(), request, sofaResponse, throwable)); + if (EventBus.isEnable(ClientAfterSendEvent.class)) { + EventBus.post(new ClientAfterSendEvent(request)); } } @@ -194,9 +191,7 @@ public SofaResponse syncSend(SofaRequest request, int timeout) throws SofaRpcExc try { RpcInternalContext context = RpcInternalContext.getContext(); - beforeSend(context, request); - RpcInvokeContext invokeContext = RpcInvokeContext.getContext(); invokeContext.put(TripleContants.SOFA_REQUEST_KEY, request); invokeContext.put(TripleContants.SOFA_CONSUMER_CONFIG_KEY, transportConfig.getConsumerConfig()); @@ -206,6 +201,9 @@ public SofaResponse syncSend(SofaRequest request, int timeout) throws SofaRpcExc throwable = convertToRpcException(e); throw throwable; } finally { + if (EventBus.isEnable(ClientAfterSendEvent.class)) { + EventBus.post(new ClientAfterSendEvent(request)); + } if (EventBus.isEnable(ClientSyncReceiveEvent.class)) { EventBus.post(new ClientSyncReceiveEvent(transportConfig.getConsumerConfig(), transportConfig.getProviderInfo(), request, sofaResponse, throwable)); From 9eff2f651317b9a6c64cd9479ead26b12af40235 Mon Sep 17 00:00:00 2001 From: evenliu Date: Thu, 5 Jan 2023 10:47:38 +0800 Subject: [PATCH 09/11] fix triple multi classloader switch problem (#1278) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix triple multi classloader switch problem * fix triple client and server log uniqueName without version info * fix triple client and server log uniqueName without version info * fix triple client and server log uniqueName without version info * add two uniqueId case * feat: tri test case 4 ark Co-authored-by: liujianjun.ljj Co-authored-by: 均源 --- .../TripleClientRegistryApplication.java | 2 +- .../rpc/server/triple/GenericServiceImpl.java | 28 +- .../sofa/rpc/server/triple/TripleServer.java | 44 +-- .../rpc/server/triple/UniqueIdInvoker.java | 34 +++ .../server/triple/GenericServiceImplTest.java | 15 +- .../rpc/test/AnotherHelloServiceImpl.java | 51 ++++ .../sofa/rpc/test/triple/GreeterImpl2.java | 58 ++++ .../rpc/test/triple/TripleServerTest.java | 49 +++ .../rpc/triple/TripleHessianInterface.java | 2 + .../triple/TripleHessianInterfaceImpl.java | 9 + .../rpc/triple/TripleHessianInvokeTest.java | 68 +++++ .../rpc/triple/ark/MultiClassLoaderTest.java | 278 ++++++++++++++++++ 12 files changed, 598 insertions(+), 40 deletions(-) create mode 100644 test/test-common/src/main/java/com/alipay/sofa/rpc/test/AnotherHelloServiceImpl.java create mode 100644 test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/GreeterImpl2.java create mode 100644 test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/ark/MultiClassLoaderTest.java diff --git a/example/src/test/java/com/alipay/sofa/rpc/triple/TripleClientRegistryApplication.java b/example/src/test/java/com/alipay/sofa/rpc/triple/TripleClientRegistryApplication.java index 4c8e7dfd9..7a0ab355a 100644 --- a/example/src/test/java/com/alipay/sofa/rpc/triple/TripleClientRegistryApplication.java +++ b/example/src/test/java/com/alipay/sofa/rpc/triple/TripleClientRegistryApplication.java @@ -44,7 +44,7 @@ public static void main(String[] args) { .setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE) //.setRegistry(registryConfig) .setApplication(clientApp) - .setDirectUrl("tri://10.15.232.18:19544") + .setDirectUrl("tri://127.0.0.1:50051") .setRegister(false); SofaGreeterTriple.IGreeter greeterBlockingStub = consumerConfig.refer(); diff --git a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/GenericServiceImpl.java b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/GenericServiceImpl.java index 00c06f150..dff254195 100644 --- a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/GenericServiceImpl.java +++ b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/GenericServiceImpl.java @@ -18,17 +18,13 @@ import com.alipay.sofa.rpc.codec.Serializer; import com.alipay.sofa.rpc.codec.SerializerFactory; -import com.alipay.sofa.rpc.common.cache.ReflectCache; import com.alipay.sofa.rpc.common.utils.ClassTypeUtils; import com.alipay.sofa.rpc.common.utils.ClassUtils; -import com.alipay.sofa.rpc.config.ConfigUniqueNameGenerator; -import com.alipay.sofa.rpc.config.ProviderConfig; import com.alipay.sofa.rpc.core.exception.RpcErrorType; import com.alipay.sofa.rpc.core.exception.SofaRpcException; import com.alipay.sofa.rpc.core.exception.SofaRpcRuntimeException; import com.alipay.sofa.rpc.core.request.SofaRequest; import com.alipay.sofa.rpc.core.response.SofaResponse; -import com.alipay.sofa.rpc.invoke.Invoker; import com.alipay.sofa.rpc.log.Logger; import com.alipay.sofa.rpc.log.LoggerFactory; import com.alipay.sofa.rpc.tracer.sofatracer.TracingContextKey; @@ -52,18 +48,11 @@ public class GenericServiceImpl extends SofaGenericServiceTriple.GenericServiceI private static final Logger LOGGER = LoggerFactory.getLogger(GenericServiceImpl.class); - protected Invoker invoker; - protected ProviderConfig providerConfig; + protected UniqueIdInvoker invoker; - public GenericServiceImpl(Invoker invoker, ProviderConfig providerConfig) { + public GenericServiceImpl(UniqueIdInvoker invoker) { super(); this.invoker = invoker; - this.providerConfig = providerConfig; - String key = ConfigUniqueNameGenerator.getUniqueName(providerConfig); - // 缓存接口的方法 - for (Method m : providerConfig.getProxyClass().getMethods()) { - ReflectCache.putOverloadMethodCache(key, m); - } } @Override @@ -74,15 +63,16 @@ public void generic(Request request, StreamObserver responseObserver) SofaRequest sofaRequest = TracingContextKey.getKeySofaRequest().get(Context.current()); String methodName = sofaRequest.getMethodName(); try { - String key = ConfigUniqueNameGenerator.getUniqueName(providerConfig); - ClassLoader interfaceClassLoader = providerConfig.getProxyClass().getClassLoader(); - Thread.currentThread().setContextClassLoader(interfaceClassLoader); + ClassLoader serviceClassLoader = invoker.getServiceClassLoader(sofaRequest); + Thread.currentThread().setContextClassLoader(serviceClassLoader); + Method declaredMethod = invoker.getDeclaredMethod(sofaRequest, request); + if (declaredMethod == null) { + throw new SofaRpcException(RpcErrorType.SERVER_NOT_FOUND_INVOKER, "Cannot find invoke method " + + methodName); + } Class[] argTypes = getArgTypes(request); Serializer serializer = SerializerFactory.getSerializer(request.getSerializeType()); - - Method declaredMethod = ReflectCache.getOverloadMethodCache(key, methodName, request.getArgTypesList() - .toArray(new String[0])); Object[] invokeArgs = getInvokeArgs(request, argTypes, serializer); // fill sofaRequest diff --git a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/TripleServer.java b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/TripleServer.java index 6d3dcbf14..b5268a0f7 100644 --- a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/TripleServer.java +++ b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/TripleServer.java @@ -242,9 +242,14 @@ public void stop() { @Override public void registerProcessor(ProviderConfig providerConfig, Invoker instance) { - Object ref = providerConfig.getRef(); this.lock.lock(); try { + String key = ConfigUniqueNameGenerator.getUniqueName(providerConfig); + ReflectCache.registerServiceClassLoader(key, providerConfig.getProxyClass().getClassLoader()); + // 缓存接口的方法 + for (Method m : providerConfig.getProxyClass().getMethods()) { + ReflectCache.putOverloadMethodCache(key, m); + } // wrap invoker to support unique id UniqueIdInvoker oldInvoker = this.invokerMap.putIfAbsent(providerConfig.getInterfaceId(), new UniqueIdInvoker()); if (null != oldInvoker) { @@ -261,22 +266,7 @@ public void registerProcessor(ProviderConfig providerConfig, Invoker instance) { throw new IllegalStateException("Can not expose service with interface:" + providerConfig.getInterfaceId() + " and unique id: " + providerConfig.getUniqueId()); } - // create service definition - ServerServiceDefinition serviceDef; - if (SofaProtoUtils.isProtoClass(ref)) { - // refer is BindableService - this.setBindableProxiedImpl(providerConfig, uniqueIdInvoker); - BindableService bindableService = (BindableService) providerConfig.getRef(); - serviceDef = bindableService.bindService(); - } else { - GenericServiceImpl genericService = new GenericServiceImpl(uniqueIdInvoker, providerConfig); - genericService.setProxiedImpl(genericService); - serviceDef = buildSofaServiceDef(genericService, providerConfig); - } - - List interceptorList = buildInterceptorChain(serviceDef); - ServerServiceDefinition serviceDefinition = ServerInterceptors.intercept( - serviceDef, interceptorList); + ServerServiceDefinition serviceDefinition = getServerServiceDefinition(providerConfig, uniqueIdInvoker); this.serviceInfo.put(providerConfig, serviceDefinition); ServerServiceDefinition ssd = this.handlerRegistry.addService(serviceDefinition); if (ssd != null) { @@ -294,6 +284,26 @@ public void registerProcessor(ProviderConfig providerConfig, Invoker instance) { } } + private ServerServiceDefinition getServerServiceDefinition(ProviderConfig providerConfig, UniqueIdInvoker uniqueIdInvoker) { + // create service definition + ServerServiceDefinition serviceDef; + if (SofaProtoUtils.isProtoClass(providerConfig.getRef())) { + // refer is BindableService + this.setBindableProxiedImpl(providerConfig, uniqueIdInvoker); + BindableService bindableService = (BindableService) providerConfig.getRef(); + serviceDef = bindableService.bindService(); + } else { + GenericServiceImpl genericService = new GenericServiceImpl(uniqueIdInvoker); + genericService.setProxiedImpl(genericService); + serviceDef = buildSofaServiceDef(genericService, providerConfig); + } + + List interceptorList = buildInterceptorChain(serviceDef); + ServerServiceDefinition serviceDefinition = ServerInterceptors.intercept( + serviceDef, interceptorList); + return serviceDefinition; + } + private void setBindableProxiedImpl(ProviderConfig providerConfig, Invoker invoker) { Class implClass = providerConfig.getRef().getClass(); try { diff --git a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/UniqueIdInvoker.java b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/UniqueIdInvoker.java index 7f725e057..15b615bd3 100644 --- a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/UniqueIdInvoker.java +++ b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/server/triple/UniqueIdInvoker.java @@ -16,7 +16,9 @@ */ package com.alipay.sofa.rpc.server.triple; +import com.alipay.sofa.rpc.common.cache.ReflectCache; import com.alipay.sofa.rpc.common.utils.StringUtils; +import com.alipay.sofa.rpc.config.ConfigUniqueNameGenerator; import com.alipay.sofa.rpc.config.ProviderConfig; import com.alipay.sofa.rpc.context.RpcInvokeContext; import com.alipay.sofa.rpc.core.exception.RpcErrorType; @@ -24,7 +26,10 @@ import com.alipay.sofa.rpc.core.request.SofaRequest; import com.alipay.sofa.rpc.core.response.SofaResponse; import com.alipay.sofa.rpc.invoke.Invoker; +import com.alipay.sofa.rpc.server.ProviderProxyInvoker; +import triple.Request; +import java.lang.reflect.Method; import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.Lock; @@ -164,4 +169,33 @@ private Invoker findInvoker(String interfaceName, String uniqueId) { } } + public ClassLoader getServiceClassLoader(SofaRequest sofaRequest) { + String uniqueName = this.getServiceUniqueName(sofaRequest); + return ReflectCache.getServiceClassLoader(uniqueName); + } + + public Method getDeclaredMethod(SofaRequest sofaRequest, Request request) { + String uniqueName = this.getServiceUniqueName(sofaRequest); + return ReflectCache.getOverloadMethodCache(uniqueName, sofaRequest.getMethodName(), request + .getArgTypesList() + .toArray(new String[0])); + } + + private String getServiceUniqueName(SofaRequest sofaRequest) { + this.readLock.lock(); + try { + Invoker invoker = this.findInvoker(sofaRequest.getInterfaceName(), getUniqueIdFromInvokeContext()); + ProviderConfig providerConfig = null; + if (invoker instanceof ProviderProxyInvoker) { + providerConfig = ((ProviderProxyInvoker) invoker).getProviderConfig(); + } + if (providerConfig == null) { + throw new SofaRpcException(RpcErrorType.SERVER_NOT_FOUND_INVOKER, "Cannot find providerConfig"); + } + return ConfigUniqueNameGenerator.getUniqueName(providerConfig); + } finally { + this.readLock.unlock(); + } + } + } diff --git a/remoting/remoting-triple/src/test/java/com/alipay/sofa/rpc/server/triple/GenericServiceImplTest.java b/remoting/remoting-triple/src/test/java/com/alipay/sofa/rpc/server/triple/GenericServiceImplTest.java index b5b1b046c..e0c566ec3 100644 --- a/remoting/remoting-triple/src/test/java/com/alipay/sofa/rpc/server/triple/GenericServiceImplTest.java +++ b/remoting/remoting-triple/src/test/java/com/alipay/sofa/rpc/server/triple/GenericServiceImplTest.java @@ -19,11 +19,12 @@ import com.alipay.sofa.rpc.codec.Serializer; import com.alipay.sofa.rpc.codec.SerializerFactory; import com.alipay.sofa.rpc.codec.sofahessian.SofaHessianSerializer; +import com.alipay.sofa.rpc.common.cache.ReflectCache; import com.alipay.sofa.rpc.config.ConfigUniqueNameGenerator; import com.alipay.sofa.rpc.config.ProviderConfig; import com.alipay.sofa.rpc.core.request.SofaRequest; -import com.alipay.sofa.rpc.filter.ProviderInvoker; import com.alipay.sofa.rpc.message.MessageBuilder; +import com.alipay.sofa.rpc.server.ProviderProxyInvoker; import com.alipay.sofa.rpc.tracer.sofatracer.TracingContextKey; import com.alipay.sofa.rpc.transport.ByteArrayWrapperByteBuf; import com.alipay.sofa.rpc.transport.triple.TripleClientInvoker; @@ -53,8 +54,16 @@ public GenericServiceImplTest(){ providerConfig = new ProviderConfig<>(); providerConfig.setRef(new HelloServiceImpl()); providerConfig.setInterfaceId(HelloService.class.getName()); - ProviderInvoker invoker = new ProviderInvoker<>(providerConfig); - genericService = new GenericServiceImpl(invoker,providerConfig); + String key = ConfigUniqueNameGenerator.getUniqueName(providerConfig); + ReflectCache.registerServiceClassLoader(key, providerConfig.getProxyClass().getClassLoader()); + // 缓存接口的方法 + for (Method m : providerConfig.getProxyClass().getMethods()) { + ReflectCache.putOverloadMethodCache(key, m); + } + ProviderProxyInvoker invoker = new ProviderProxyInvoker(providerConfig); + UniqueIdInvoker uniqueIdInvoker = new UniqueIdInvoker(); + uniqueIdInvoker.registerInvoker(providerConfig, invoker); + genericService = new GenericServiceImpl(uniqueIdInvoker); responseObserver = new MockStreamObserver<>(); } diff --git a/test/test-common/src/main/java/com/alipay/sofa/rpc/test/AnotherHelloServiceImpl.java b/test/test-common/src/main/java/com/alipay/sofa/rpc/test/AnotherHelloServiceImpl.java new file mode 100644 index 000000000..eddf1b66e --- /dev/null +++ b/test/test-common/src/main/java/com/alipay/sofa/rpc/test/AnotherHelloServiceImpl.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.test; + +/** + * + * @author GengZhang + */ +public class AnotherHelloServiceImpl implements HelloService { + + private int sleep; + + private String result; + + public AnotherHelloServiceImpl() { + + } + + public AnotherHelloServiceImpl(String result) { + this.result = result; + } + + public AnotherHelloServiceImpl(int sleep) { + this.sleep = sleep; + } + + @Override + public String sayHello(String name, int age) { + if (sleep > 0) { + try { + Thread.sleep(sleep); + } catch (Exception ignore) { + } + } + return result != null ? result : "hello " + name + " from server! age: " + age; + } +} diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/GreeterImpl2.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/GreeterImpl2.java new file mode 100644 index 000000000..625788cf8 --- /dev/null +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/GreeterImpl2.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.test.triple; + +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; +import io.grpc.examples.helloworld.SofaGreeterTriple; +import io.grpc.stub.StreamObserver; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; + +/** + * @author Even + * @date 2023/1/3 12:17 AM + */ +public class GreeterImpl2 extends SofaGreeterTriple.GreeterImplBase { + + //Intentionally using unsupported format + static final DateTimeFormatter[] datetimeFormatter = new DateTimeFormatter[] { DateTimeFormatter.ISO_DATE_TIME, + DateTimeFormatter.ISO_LOCAL_DATE_TIME, + DateTimeFormatter.BASIC_ISO_DATE }; + + @Override + public void sayHello(HelloRequest req, StreamObserver responseObserver) { + HelloRequest.DateTime reqDateTime = req.getDateTime(); + int i = 0; + try { + i = Integer.parseInt(reqDateTime.getTime()); + } catch (Exception e) { + //TODO: handle exception + } + LocalDateTime dt = LocalDateTime.now(); + String dtStr = dt.format(datetimeFormatter[i % datetimeFormatter.length]); + HelloRequest.DateTime rplyDateTime = HelloRequest.DateTime.newBuilder(reqDateTime) + .setDate(dtStr).build(); + HelloReply reply = HelloReply.newBuilder() + .setMessage("Hello2 " + req.getName()) + .setDateTime(rplyDateTime) + .build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } +} \ No newline at end of file diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/TripleServerTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/TripleServerTest.java index 93bb26758..7014b1ce5 100644 --- a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/TripleServerTest.java +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/triple/TripleServerTest.java @@ -242,6 +242,55 @@ public void testSyncTimeout() { Assert.assertTrue(exp); } + @Test + public void testExposeTwoUniqueId() { + ApplicationConfig applicationConfig = new ApplicationConfig().setAppName("triple-server1"); + + int port = 50052; + + ServerConfig serverConfig = new ServerConfig() + .setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE) + .setPort(port); + + ProviderConfig providerConfig = new ProviderConfig() + .setApplication(applicationConfig) + .setUniqueId("abc") + .setBootstrap(RpcConstants.PROTOCOL_TYPE_TRIPLE) + .setInterfaceId(SofaGreeterTriple.IGreeter.class.getName()) + .setRef(new GreeterImpl()) + .setServer(serverConfig); + + providerConfig.export(); + + ProviderConfig providerConfig2 = new ProviderConfig() + .setApplication(applicationConfig) + .setUniqueId("abcd") + .setBootstrap(RpcConstants.PROTOCOL_TYPE_TRIPLE) + .setInterfaceId(SofaGreeterTriple.IGreeter.class.getName()) + .setRef(new GreeterImpl2()) + .setServer(serverConfig); + providerConfig2.export(); + + ConsumerConfig consumerConfig = new ConsumerConfig<>(); + consumerConfig.setInterfaceId(SofaGreeterTriple.IGreeter.class.getName()) + .setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE) + .setUniqueId("abc") + .setDirectUrl("tri://127.0.0.1:" + port); + SofaGreeterTriple.IGreeter greeterBlockingStub = consumerConfig.refer(); + HelloRequest.DateTime dateTime = HelloRequest.DateTime.newBuilder().setDate("2018-12-28").setTime("11:13:00") + .build(); + HelloRequest request = HelloRequest.newBuilder().setName("world").setDateTime(dateTime).build(); + Assert.assertEquals("Hello world", greeterBlockingStub.sayHello(request).getMessage()); + + ConsumerConfig consumerConfig2 = new ConsumerConfig<>(); + consumerConfig2.setInterfaceId(SofaGreeterTriple.IGreeter.class.getName()) + .setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE) + .setUniqueId("abcd") + .setDirectUrl("tri://127.0.0.1:" + port); + SofaGreeterTriple.IGreeter greeterBlockingStub2 = consumerConfig2.refer(); + Assert.assertEquals("Hello2 world", greeterBlockingStub2.sayHello(request).getMessage()); + } + @BeforeClass public static void adBeforeClass() { RpcRunningState.setUnitTestMode(true); diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/TripleHessianInterface.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/TripleHessianInterface.java index cbe176baa..e526a3fad 100644 --- a/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/TripleHessianInterface.java +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/TripleHessianInterface.java @@ -26,6 +26,8 @@ public interface TripleHessianInterface { String call1(); + String findFlag(); + Response call2(Request request); boolean testPressureMark(String name); diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/TripleHessianInterfaceImpl.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/TripleHessianInterfaceImpl.java index 768d6573b..f232b8dcb 100644 --- a/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/TripleHessianInterfaceImpl.java +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/TripleHessianInterfaceImpl.java @@ -33,6 +33,10 @@ public TripleHessianInterfaceImpl() { this.flag = ""; } + public TripleHessianInterfaceImpl(String flag) { + this.flag = flag; + } + @Override public void call() { this.flag = "call"; @@ -44,6 +48,11 @@ public String call1() { return flag; } + @Override + public String findFlag() { + return this.flag; + } + @Override public Response call2(Request request) { if (request != null) { diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/TripleHessianInvokeTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/TripleHessianInvokeTest.java index bdfddf3ca..ad88a35e3 100644 --- a/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/TripleHessianInvokeTest.java +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/TripleHessianInvokeTest.java @@ -269,6 +269,74 @@ public void testExposeTwice() { serverConfig.destroy(); } + @Test + public void testExposeTwoUniqueId() { + String uniqueId = "uniqueId"; + RpcRunningState.setDebugMode(true); + + ApplicationConfig clientApp = new ApplicationConfig().setAppName("triple-client"); + + ApplicationConfig serverApp = new ApplicationConfig().setAppName("triple-server"); + + int port = getPort(); + + ServerConfig serverConfig = new ServerConfig() + .setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE) + .setPort(port); + + TripleHessianInterfaceImpl ref = new TripleHessianInterfaceImpl("test1"); + ProviderConfig providerConfig = getProviderConfig() + .setApplication(serverApp) + .setUniqueId(uniqueId) + .setBootstrap(RpcConstants.PROTOCOL_TYPE_TRIPLE) + .setInterfaceId(TripleHessianInterface.class.getName()) + .setRef(ref) + .setServer(serverConfig) + .setRegister(false); + + providerConfig.export(); + + String uniqueId2 = "uniqueId2"; + RpcRunningState.setDebugMode(true); + + TripleHessianInterfaceImpl ref2 = new TripleHessianInterfaceImpl("test2"); + ProviderConfig providerConfig2 = getProviderConfig() + .setApplication(serverApp) + .setUniqueId(uniqueId2) + .setBootstrap(RpcConstants.PROTOCOL_TYPE_TRIPLE) + .setInterfaceId(TripleHessianInterface.class.getName()) + .setRef(ref2) + .setServer(serverConfig) + .setRegister(false); + providerConfig2.export(); + + ConsumerConfig consumerConfig = new ConsumerConfig<>(); + consumerConfig.setInterfaceId(TripleHessianInterface.class.getName()) + .setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE) + .setDirectUrl("localhost:" + port) + .setUniqueId(uniqueId) + .setRegister(false) + .setApplication(clientApp); + TripleHessianInterface helloService = consumerConfig.refer(); + LOGGER.info("Grpc stub bean successful: {}", helloService.getClass().getName()); + Assert.assertEquals("test1", helloService.findFlag()); + + ConsumerConfig consumerConfig2 = new ConsumerConfig<>(); + consumerConfig2.setInterfaceId(TripleHessianInterface.class.getName()) + .setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE) + .setDirectUrl("localhost:" + port) + .setUniqueId(uniqueId2) + .setRegister(false) + .setApplication(clientApp); + TripleHessianInterface helloService2 = consumerConfig2.refer(); + LOGGER.info("Grpc stub bean successful: {}", helloService2.getClass().getName()); + Assert.assertEquals("test2", helloService2.findFlag()); + + providerConfig2.unExport(); + providerConfig.unExport(); + serverConfig.destroy(); + } + @Test public void testTripleRpcInvokeContext() { ApplicationConfig clientApp = new ApplicationConfig().setAppName("triple-client"); diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/ark/MultiClassLoaderTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/ark/MultiClassLoaderTest.java new file mode 100644 index 000000000..0d173dc71 --- /dev/null +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/ark/MultiClassLoaderTest.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.triple.ark; + +import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.config.ApplicationConfig; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.config.ProviderConfig; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.test.AnotherHelloServiceImpl; +import com.alipay.sofa.rpc.test.HelloService; +import com.alipay.sofa.rpc.test.HelloServiceImpl; +import org.junit.Test; +import org.springframework.util.Assert; + +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * + * @author junyuan + * @version MultiClassLoaderTest.java, v 0.1 2022年12月30日 15:02 junyuan Exp $ + */ +public class MultiClassLoaderTest { + + private static final AtomicInteger PORT = new AtomicInteger(51003); + + public void init() { + + } + + /* + * Launcher.AppClassLoader -> HelloService (Interface) + * / \ + * / \ + * cl1 cl2 + * | | + * v v + * HelloServiceImpl AnotherHelloServiceImpl + */ + @Test + public void test() throws InterruptedException { + ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader(); + + URL url4Impl = HelloServiceImpl.class.getProtectionDomain().getCodeSource().getLocation(); + URL url4AnotherImpl = AnotherHelloServiceImpl.class.getProtectionDomain().getCodeSource().getLocation(); + + SpecificTestClassLoader cl1 = new SpecificTestClassLoader("test_classloader_1", new URL[]{url4Impl}, oldClassLoader); + cl1.addWhiteListClass(HelloServiceImpl.class.getName()); + + SpecificTestClassLoader cl2 = new SpecificTestClassLoader("test_classloader_2", new URL[]{url4AnotherImpl}, oldClassLoader); + cl2.addWhiteListClass(AnotherHelloServiceImpl.class.getName()); + + SpecificTestClassLoader clientClassloader = new SpecificTestClassLoader("client_classLoader", new URL[]{}, oldClassLoader); + + int port = getPort(); + + ServerConfig serverConfig = new ServerConfig() + .setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE) + .setPort(port); + + ApplicationConfig clientApp = new ApplicationConfig().setAppName("triple-client"); + ApplicationConfig serverApp = new ApplicationConfig().setAppName("triple-server"); + + Thread.currentThread().setContextClassLoader(cl1); + ProviderConfig providerConfig1 = getProviderConfig(1) + .setServer(serverConfig) + .setBootstrap(RpcConstants.PROTOCOL_TYPE_TRIPLE) + .setInterfaceId(HelloService.class.getName()) + .setRef(new HelloServiceImpl()) + .setApplication(serverApp) + .setRegister(false) + .setUniqueId("helloService"); + providerConfig1.export(); + + Thread.currentThread().setContextClassLoader(cl2); + ProviderConfig providerConfig2 = getProviderConfig(1) + .setServer(serverConfig) + .setBootstrap(RpcConstants.PROTOCOL_TYPE_TRIPLE) + .setInterfaceId(HelloService.class.getName()) + .setRef(new AnotherHelloServiceImpl()) + .setApplication(serverApp) + .setRegister(false) + .setUniqueId("anotherHelloService"); + providerConfig2.export(); + + Thread.currentThread().setContextClassLoader(clientClassloader); + ConsumerConfig consumerConfig1 = new ConsumerConfig<>(); + consumerConfig1.setInterfaceId(HelloService.class.getName()) + .setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE) + .setDirectUrl("tri://127.0.0.1:" + port) + .setRegister(false) + .setApplication(clientApp) + .setUniqueId("helloService"); + HelloService helloService1 = consumerConfig1.refer(); + + ConsumerConfig consumerConfig2 = new ConsumerConfig<>(); + consumerConfig2.setInterfaceId(HelloService.class.getName()) + .setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE) + .setDirectUrl("tri://127.0.0.1:" + port) + .setRegister(false) + .setApplication(clientApp) + .setUniqueId("anotherHelloService"); + HelloService helloService2 = consumerConfig2.refer(); + + String result1 = helloService1.sayHello("impl1", 1); + + Assert.isTrue(result1.contains("impl1"), "helloService1 run fail, result is " + result1); + + String result2 = helloService2.sayHello("impl2", 2); + Assert.isTrue(result2.contains("impl2"), "anotherHelloService2 run fail, result is " + result2); + + Thread.currentThread().setContextClassLoader(cl1); + providerConfig1.unExport(); + Thread.currentThread().setContextClassLoader(cl2); + providerConfig2.unExport(); + + Thread.currentThread().setContextClassLoader(oldClassLoader); + serverConfig.destroy(); + + // ========================================================================================= + // then another brand new cl would do server init + SpecificTestClassLoader cl3 = new SpecificTestClassLoader("test_classloader_3", new URL[]{url4AnotherImpl}, oldClassLoader); + cl3.addWhiteListClass(AnotherHelloServiceImpl.class.getName()); + SpecificTestClassLoader clientClassloader2 = new SpecificTestClassLoader("client_classLoader_2", new URL[]{}, oldClassLoader); + + Thread.currentThread().setContextClassLoader(cl3); + ProviderConfig providerConfig3 = getProviderConfig(1) + .setServer(serverConfig) + .setBootstrap(RpcConstants.PROTOCOL_TYPE_TRIPLE) + .setInterfaceId(HelloService.class.getName()) + .setRef(new AnotherHelloServiceImpl()) + .setApplication(serverApp) + .setRegister(false) + .setUniqueId("anotherHelloService"); + providerConfig2.export(); + + Thread.currentThread().setContextClassLoader(clientClassloader2); + ConsumerConfig consumerConfig3 = new ConsumerConfig<>(); + consumerConfig3.setInterfaceId(HelloService.class.getName()) + .setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE) + .setDirectUrl("tri://127.0.0.1:" + port) + .setRegister(false) + .setApplication(clientApp) + .setUniqueId("anotherHelloService"); + HelloService helloService3 = consumerConfig3.refer(); + + String result3 = helloService3.sayHello("impl3", 2); + Assert.isTrue(result3.contains("impl3"), "anotherHelloService3 run fail, result is " + result3); + + Thread.currentThread().setContextClassLoader(cl3); + providerConfig3.unExport(); + + Thread.currentThread().setContextClassLoader(oldClassLoader); + serverConfig.destroy(); + } + + + private ProviderConfig getProviderConfig(int exportLimit) { + ProviderConfig providerConfig = new ProviderConfig<>(); + providerConfig.setRepeatedExportLimit(exportLimit); + return providerConfig; + } + + + private int getPort() { + int andIncrement = PORT.getAndIncrement(); + return andIncrement; + } + + + + /** + * a specific classloader + * would load class with refClassloader if load nothing by itself + * logic of this classloader would be similar to com.alipay.sofa.ark.container.service.classloader.BizClassLoader + */ + class SpecificTestClassLoader extends URLClassLoader { + + private String identity; + + private ClassLoader refClassLoader; + + private Set blackList = new HashSet<>(); + + /** active only if not null */ + private Set whiteList = new HashSet<>(); + + public SpecificTestClassLoader(String identity, URL[] urls) { + super(urls); + this.identity = identity; + } + + public SpecificTestClassLoader(String identity, URL[] urls, ClassLoader ref) { + super(urls); + this.identity = identity; + this.refClassLoader = ref; + } + + @Override + public Class loadClass(String name, boolean resolve) throws ClassNotFoundException { + Class clazz = null; + + // skip load if in black list + if (!blackList.contains(name)) { + clazz = whiteListLoad(name, resolve); + } + + if (clazz == null) { + clazz = refClassLoader.loadClass(name); + } + + if (clazz == null) { + throw new ClassNotFoundException(); + } + + return clazz; + } + + @Override + public Enumeration getResources(String name) throws IOException { + Enumeration urls = super.getResources(name); + if (!urls.hasMoreElements()) { + urls = refClassLoader.getResources(name); + } + return urls; + } + + @Override + public URL getResource(String name) { + URL url = super.getResource(name); + if (url == null ) { + url = refClassLoader.getResource(name); + } + return url; + } + + /** do load only if white list is not empty and class do in white list */ + private Class whiteListLoad(String className, boolean resolve) throws ClassNotFoundException { + Class clazz = null; + if (!whiteList.isEmpty()) { + if (!whiteList.contains(className)) { + // class is not allowed to load with current cl + return null; + } + } + + return super.loadClass(className, resolve); + } + + public void addBlackListClass(String className) { + this.blackList.add(className); + } + + public void addWhiteListClass(String className) { + this.whiteList.add(className); + } + } +} \ No newline at end of file From cded0cb52411271340f7462a4e3a151f3dd787ba Mon Sep 17 00:00:00 2001 From: Jason Song Date: Fri, 6 Jan 2023 10:09:06 +0800 Subject: [PATCH 10/11] Add security policy (#1293) --- SECURITY.md | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 SECURITY.md diff --git a/SECURITY.md b/SECURITY.md new file mode 100644 index 000000000..137842035 --- /dev/null +++ b/SECURITY.md @@ -0,0 +1,9 @@ +# Security Policy + +## Reporting a Vulnerability + +If you have apprehensions regarding SOFAStack's security or you discover vulnerability or potential threat, don’t hesitate to get in touch with us by dropping a mail at sofastack@antgroup.com. + +In the mail, specify the description of the issue or potential threat. You are also urged to recommend the way to reproduce and replicate the issue. The SOFAStack community will get back to you after assessing and analysing the findings. + +PLEASE PAY ATTENTION to report the security issue on the security email before disclosing it on public domain. From af35a3270925daf39bb8e9eaa29b57713d69331c Mon Sep 17 00:00:00 2001 From: evenliu Date: Fri, 6 Jan 2023 16:14:39 +0800 Subject: [PATCH 11/11] upgrade rpc version to 5.9.1 (#1294) Co-authored-by: liujianjun.ljj --- all/pom.xml | 2 +- bom/pom.xml | 2 +- core/api/src/main/java/com/alipay/sofa/rpc/common/Version.java | 2 +- pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/all/pom.xml b/all/pom.xml index c268f5575..86849d20f 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -6,7 +6,7 @@ com.alipay.sofa sofa-rpc-all - 5.9.1-SNAPSHOT + 5.9.1 ${project.groupId}:${project.artifactId} diff --git a/bom/pom.xml b/bom/pom.xml index 36922541f..baa842a73 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -10,7 +10,7 @@ pom - 5.9.1-SNAPSHOT + 5.9.1 3.20.0-GA 1.9.8 4.1.77.Final diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/common/Version.java b/core/api/src/main/java/com/alipay/sofa/rpc/common/Version.java index e29efb4b4..f7b931e64 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/common/Version.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/common/Version.java @@ -37,6 +37,6 @@ public final class Version { /** * 当前Build版本,每次发布修改 */ - public static final String BUILD_VERSION = "5.9.1_20221207115251"; + public static final String BUILD_VERSION = "5.9.1_20230106144430"; } diff --git a/pom.xml b/pom.xml index ca2d55c74..496ec3cb0 100644 --- a/pom.xml +++ b/pom.xml @@ -77,7 +77,7 @@ - 5.9.1-SNAPSHOT + 5.9.1 1.33 true true