From 3e317ff7449a5822047ccfed5d8a600b6dad2405 Mon Sep 17 00:00:00 2001 From: Narziss Date: Thu, 31 Oct 2024 20:41:55 +0800 Subject: [PATCH] Optimize dynamic config: integrate Zookeeper & Nacos, support interface-level dynamic config (#1430) * integrate Zookeeper and Nacos as configuration centers * support dynamic config at the interface level * Optimize interface-level dynamic config * Modify dynamic config test * Add DynamicUrl * Optimize config update process * Modify config update process * Modify ApolloDynamicConfigManagerTest --- all/pom.xml | 12 + .../bootstrap/DefaultConsumerBootstrap.java | 69 +++++- .../apollo/ApolloDynamicConfigManager.java | 116 ++++++++- .../ApolloDynamicConfigManagerTest.java | 19 +- config/config-nacos/pom.xml | 107 ++++++++ .../nacos/NacosDynamicConfigManager.java | 207 ++++++++++++++++ ...ipay.sofa.rpc.dynamic.DynamicConfigManager | 2 + .../nacos/NacosDynamicConfigManagerTest.java | 58 +++++ .../config-nacos/src/test/resources/log4j.xml | 16 ++ config/config-zk/pom.xml | 117 +++++++++ .../zk/ZookeeperDynamicConfigManager.java | 231 ++++++++++++++++++ ...ipay.sofa.rpc.dynamic.DynamicConfigManager | 1 + .../zk/ZookeeperDynamicConfigManagerTest.java | 58 +++++ config/config-zk/src/test/resources/log4j.xml | 16 ++ config/pom.xml | 2 + .../sofa/rpc/client/AbstractCluster.java | 3 +- .../sofa/rpc/client/lb/AutoLoadBalancer.java | 2 +- .../rpc/config/AbstractInterfaceConfig.java | 40 ++- .../sofa/rpc/dynamic/ConfigChangeType.java | 39 +++ .../sofa/rpc/dynamic/ConfigChangedEvent.java | 88 +++++++ .../sofa/rpc/dynamic/DynamicConfigKeys.java | 25 +- .../rpc/dynamic/DynamicConfigManager.java | 54 +++- .../dynamic/DynamicConfigManagerFactory.java | 2 +- .../alipay/sofa/rpc/dynamic/DynamicUrl.java | 102 ++++++++ .../sofa/rpc/listener/ConfigListener.java | 11 + .../sofa/rpc/dynamic/DynamicUrlTest.java | 79 ++++++ test/test-integration/pom.xml | 20 +- .../test/config/ApolloDynamicConfigTest.java | 97 ++++++++ .../test/config/NacosDynamicConfigTest.java | 81 ++++++ .../config/ZookeeperDynamicConfigTest.java | 97 ++++++++ .../sofa/rpc/test/config/base/BaseZkTest.java | 62 +++++ 31 files changed, 1793 insertions(+), 40 deletions(-) create mode 100644 config/config-nacos/pom.xml create mode 100644 config/config-nacos/src/main/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManager.java create mode 100644 config/config-nacos/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.dynamic.DynamicConfigManager create mode 100644 config/config-nacos/src/test/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManagerTest.java create mode 100644 config/config-nacos/src/test/resources/log4j.xml create mode 100644 config/config-zk/pom.xml create mode 100644 config/config-zk/src/main/java/com/alipay/sofa/rpc/dynamic/zk/ZookeeperDynamicConfigManager.java create mode 100644 config/config-zk/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.dynamic.DynamicConfigManager create mode 100644 config/config-zk/src/test/java/com/alipay/sofa/rpc/dynamic/zk/ZookeeperDynamicConfigManagerTest.java create mode 100644 config/config-zk/src/test/resources/log4j.xml create mode 100644 core/api/src/main/java/com/alipay/sofa/rpc/dynamic/ConfigChangeType.java create mode 100644 core/api/src/main/java/com/alipay/sofa/rpc/dynamic/ConfigChangedEvent.java create mode 100644 core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicUrl.java create mode 100644 core/api/src/test/java/com/alipay/sofa/rpc/dynamic/DynamicUrlTest.java create mode 100644 test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/ApolloDynamicConfigTest.java create mode 100644 test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/NacosDynamicConfigTest.java create mode 100644 test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/ZookeeperDynamicConfigTest.java create mode 100644 test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/base/BaseZkTest.java diff --git a/all/pom.xml b/all/pom.xml index 0034d3f69..3351476d9 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -304,6 +304,16 @@ sofa-rpc-config-apollo ${project.version} + + com.alipay.sofa + sofa-rpc-config-zk + ${project.version} + + + com.alipay.sofa + sofa-rpc-config-nacos + ${project.version} + com.alipay.sofa bolt @@ -553,6 +563,8 @@ com.alipay.sofa:sofa-rpc-tracer-opentracing-resteasy com.alipay.sofa:sofa-rpc-tracer-opentracing-triple com.alipay.sofa:sofa-rpc-config-apollo + com.alipay.sofa:sofa-rpc-config-zk + com.alipay.sofa:sofa-rpc-config-nacos com.alipay.sofa:sofa-rpc-doc-swagger diff --git a/bootstrap/bootstrap-api/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultConsumerBootstrap.java b/bootstrap/bootstrap-api/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultConsumerBootstrap.java index f6ba175a7..7e559d89b 100644 --- a/bootstrap/bootstrap-api/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultConsumerBootstrap.java +++ b/bootstrap/bootstrap-api/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultConsumerBootstrap.java @@ -20,6 +20,7 @@ import com.alipay.sofa.rpc.client.Cluster; import com.alipay.sofa.rpc.client.ClusterFactory; import com.alipay.sofa.rpc.client.ProviderGroup; +import com.alipay.sofa.rpc.common.RpcConstants; import com.alipay.sofa.rpc.common.SofaConfigs; import com.alipay.sofa.rpc.common.SofaOptions; import com.alipay.sofa.rpc.common.utils.CommonUtils; @@ -28,9 +29,12 @@ import com.alipay.sofa.rpc.config.RegistryConfig; import com.alipay.sofa.rpc.context.RpcRuntimeContext; import com.alipay.sofa.rpc.core.exception.SofaRpcRuntimeException; +import com.alipay.sofa.rpc.dynamic.ConfigChangedEvent; +import com.alipay.sofa.rpc.dynamic.ConfigChangeType; import com.alipay.sofa.rpc.dynamic.DynamicConfigKeys; import com.alipay.sofa.rpc.dynamic.DynamicConfigManager; import com.alipay.sofa.rpc.dynamic.DynamicConfigManagerFactory; +import com.alipay.sofa.rpc.dynamic.DynamicUrl; import com.alipay.sofa.rpc.ext.Extension; import com.alipay.sofa.rpc.invoke.Invoker; import com.alipay.sofa.rpc.listener.ConfigListener; @@ -44,8 +48,10 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -54,6 +60,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static com.alipay.sofa.rpc.common.RpcConstants.REGISTRY_PROTOCOL_DOMAIN; +import static com.alipay.sofa.common.config.SofaConfigs.getOrDefault; /** * Default consumer bootstrap. @@ -146,7 +153,8 @@ public T refer() { // build cluster cluster = ClusterFactory.getCluster(this); // build listeners - consumerConfig.setConfigListener(buildConfigListener(this)); + ConfigListener configListener = buildConfigListener(this); + consumerConfig.setConfigListener(configListener); consumerConfig.setProviderInfoListener(buildProviderInfoListener(this)); // init cluster cluster.init(); @@ -156,13 +164,25 @@ public T refer() { proxyIns = (T) ProxyFactory.buildProxy(consumerConfig.getProxy(), consumerConfig.getProxyClass(), proxyInvoker); - //动态配置 + //请求级别动态配置参数 final String dynamicAlias = consumerConfig.getParameter(DynamicConfigKeys.DYNAMIC_ALIAS); if (StringUtils.isNotBlank(dynamicAlias)) { final DynamicConfigManager dynamicManager = DynamicConfigManagerFactory.getDynamicManager( - consumerConfig.getAppName(), dynamicAlias); + consumerConfig.getAppName(), dynamicAlias); dynamicManager.initServiceConfiguration(consumerConfig.getInterfaceId()); } + + //接口级别动态配置参数 + Boolean dynamicConfigRefreshEnable = getOrDefault(DynamicConfigKeys.DYNAMIC_REFRESH_ENABLE); + String configCenterAddress = getOrDefault(DynamicConfigKeys.CONFIG_CENTER_ADDRESS); + if (dynamicConfigRefreshEnable && StringUtils.isNotBlank(configCenterAddress)) { + DynamicUrl dynamicUrl = new DynamicUrl(configCenterAddress); + //启用接口级别动态配置 + final DynamicConfigManager dynamicManager = DynamicConfigManagerFactory.getDynamicManager( + consumerConfig.getAppName(), dynamicUrl.getProtocol()); + dynamicManager.addListener(consumerConfig.getInterfaceId(), configListener); + dynamicManager.initServiceConfiguration(consumerConfig.getInterfaceId(), configListener); + } } catch (Exception e) { if (cluster != null) { cluster.destroy(); @@ -438,8 +458,47 @@ public void updateAllProviders(List groups) { */ private class ConsumerAttributeListener implements ConfigListener { + // 可以动态配置的选项 + private final Set supportDynamicConfigKeys = new HashSet<>(); + private final Map newValueMap = new HashMap<>(); + + ConsumerAttributeListener() { + supportDynamicConfigKeys.add(RpcConstants.CONFIG_KEY_TIMEOUT); + supportDynamicConfigKeys.add(RpcConstants.CONFIG_KEY_RETRIES); + supportDynamicConfigKeys.add(RpcConstants.CONFIG_KEY_LOADBALANCER); + } + + @Override + public void process(ConfigChangedEvent event) { + // 清除上次的动态配置值缓存 + consumerConfig.getDynamicConfigValueCache().clear(); + // 获取对应配置项的默认值 + for (String key : newValueMap.keySet()) { + if (consumerConfig.getConfigValueCache().get(key) != null) { + newValueMap.put(key, String.valueOf(consumerConfig.getConfigValueCache().get(key))); + } else { + newValueMap.put(key, null); + } + } + if (!event.getChangeType().equals(ConfigChangeType.DELETED)) { + // ADDED or MODIFIED + Map dynamicValueMap = event.getDynamicValueMap(); + for (String key : dynamicValueMap.keySet()) { + String tempKey = key.lastIndexOf(".") == -1 ? key : key.substring(key.lastIndexOf(".") + 1); + if (supportDynamicConfigKeys.contains(tempKey)) { + String value = dynamicValueMap.get(key); + if (StringUtils.isNotBlank(value)) { + consumerConfig.getDynamicConfigValueCache().put(key, value); + newValueMap.put(key, value); + } + } + } + } + attrUpdated(newValueMap); + } + @Override - public void configChanged(Map newValue) { + public void configChanged(Map newValueMap) { } @@ -452,7 +511,7 @@ public synchronized void attrUpdated(Map newValueMap) { Map oldValues = new HashMap(); boolean rerefer = false; try { // 检查是否有变化 - // 是否过滤map? + // 是否过滤map? for (Map.Entry entry : newValues.entrySet()) { String newValue = entry.getValue(); String oldValue = consumerConfig.queryAttribute(entry.getKey()); diff --git a/config/config-apollo/src/main/java/com/alipay/sofa/rpc/dynamic/apollo/ApolloDynamicConfigManager.java b/config/config-apollo/src/main/java/com/alipay/sofa/rpc/dynamic/apollo/ApolloDynamicConfigManager.java index 35367d5ca..d7aec05c5 100644 --- a/config/config-apollo/src/main/java/com/alipay/sofa/rpc/dynamic/apollo/ApolloDynamicConfigManager.java +++ b/config/config-apollo/src/main/java/com/alipay/sofa/rpc/dynamic/apollo/ApolloDynamicConfigManager.java @@ -16,13 +16,30 @@ */ package com.alipay.sofa.rpc.dynamic.apollo; +import com.alipay.sofa.common.config.SofaConfigs; import com.alipay.sofa.rpc.auth.AuthRuleGroup; +import com.alipay.sofa.rpc.common.utils.StringUtils; +import com.alipay.sofa.rpc.dynamic.ConfigChangeType; +import com.alipay.sofa.rpc.dynamic.ConfigChangedEvent; import com.alipay.sofa.rpc.dynamic.DynamicConfigKeyHelper; +import com.alipay.sofa.rpc.dynamic.DynamicConfigKeys; import com.alipay.sofa.rpc.dynamic.DynamicConfigManager; import com.alipay.sofa.rpc.dynamic.DynamicHelper; import com.alipay.sofa.rpc.ext.Extension; +import com.alipay.sofa.rpc.listener.ConfigListener; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; import com.ctrip.framework.apollo.Config; +import com.ctrip.framework.apollo.ConfigChangeListener; import com.ctrip.framework.apollo.ConfigService; +import com.ctrip.framework.apollo.enums.PropertyChangeType; +import com.ctrip.framework.apollo.model.ConfigChange; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArraySet; /** * @author bystander @@ -34,41 +51,88 @@ @Extension(value = "apollo", override = true) public class ApolloDynamicConfigManager extends DynamicConfigManager { - private Config config; + private final static Logger LOGGER = LoggerFactory.getLogger(ApolloDynamicConfigManager.class); + + private static final String APOLLO_APPID_KEY = "app.id"; + + private static final String APOLLO_ADDR_KEY = "apollo.meta"; + + private static final String APOLLO_CLUSTER_KEY = "apollo.cluster"; + + private static final String APOLLO_PARAM_APPID_KEY = "appId"; + + private static final String APOLLO_PARAM_CLUSTER_KEY = "cluster"; + + private static final String APOLLO_PARAM_NAMESPACE_KEY = "namespace"; + + private static final String APOLLO_PROTOCOL_PREFIX = "http://"; + + private final ConcurrentMap watchListenerMap = new ConcurrentHashMap<>(); + + private final Config config; protected ApolloDynamicConfigManager(String appName) { - super(appName); - config = ConfigService.getAppConfig(); + super(appName, SofaConfigs.getOrCustomDefault(DynamicConfigKeys.CONFIG_CENTER_ADDRESS, "")); + if (getDynamicUrl() != null) { + if (StringUtils.isNotBlank(getDynamicUrl().getParam(APOLLO_PARAM_APPID_KEY))) { + System.setProperty(APOLLO_APPID_KEY, getDynamicUrl().getParam(APOLLO_PARAM_APPID_KEY)); + } + if (StringUtils.isNotBlank(getDynamicUrl().getAddress())) { + System.setProperty(APOLLO_ADDR_KEY, APOLLO_PROTOCOL_PREFIX + getDynamicUrl().getAddress()); + } + if (StringUtils.isNotBlank(getDynamicUrl().getParam(APOLLO_PARAM_CLUSTER_KEY))) { + System.setProperty(APOLLO_CLUSTER_KEY, getDynamicUrl().getParam(APOLLO_PARAM_CLUSTER_KEY)); + } + if (StringUtils.isNotBlank(getDynamicUrl().getParam(APOLLO_PARAM_NAMESPACE_KEY))) { + config = ConfigService.getConfig(getDynamicUrl().getParam(APOLLO_PARAM_NAMESPACE_KEY)); + } else { + config = ConfigService.getAppConfig(); + } + } else { + config = ConfigService.getAppConfig(); + } } @Override public void initServiceConfiguration(String service) { - //TODO not now + // TODO 暂不支持 + } + + @Override + public void initServiceConfiguration(String service, ConfigListener listener) { + try { + String rawConfig = config.getProperty(service, ""); + if (StringUtils.isNotBlank(rawConfig)) { + listener.process(new ConfigChangedEvent(service, rawConfig)); + } + } catch (Exception e) { + LOGGER.error("Init service configuration error", e); + } } @Override public String getProviderServiceProperty(String service, String key) { return config.getProperty(DynamicConfigKeyHelper.buildProviderServiceProKey(service, key), - DynamicHelper.DEFAULT_DYNAMIC_VALUE); + DynamicHelper.DEFAULT_DYNAMIC_VALUE); } @Override public String getConsumerServiceProperty(String service, String key) { return config.getProperty(DynamicConfigKeyHelper.buildConsumerServiceProKey(service, key), - DynamicHelper.DEFAULT_DYNAMIC_VALUE); + DynamicHelper.DEFAULT_DYNAMIC_VALUE); } @Override public String getProviderMethodProperty(String service, String method, String key) { return config.getProperty(DynamicConfigKeyHelper.buildProviderMethodProKey(service, method, key), - DynamicHelper.DEFAULT_DYNAMIC_VALUE); + DynamicHelper.DEFAULT_DYNAMIC_VALUE); } @Override public String getConsumerMethodProperty(String service, String method, String key) { return config.getProperty(DynamicConfigKeyHelper.buildConsumerMethodProKey(service, method, key), - DynamicHelper.DEFAULT_DYNAMIC_VALUE); + DynamicHelper.DEFAULT_DYNAMIC_VALUE); } @@ -77,4 +141,40 @@ public AuthRuleGroup getServiceAuthRule(String service) { //TODO 暂不支持 return null; } + + @Override + public void addListener(String key, ConfigListener listener) { + ApolloListener apolloListener = watchListenerMap.computeIfAbsent(key, k -> new ApolloListener()); + apolloListener.addListener(listener); + config.addChangeListener(apolloListener, Collections.singleton(key)); + } + + public static class ApolloListener implements ConfigChangeListener { + + private final Set listeners = new CopyOnWriteArraySet<>(); + + @Override + public void onChange(com.ctrip.framework.apollo.model.ConfigChangeEvent changeEvent) { + for (String key : changeEvent.changedKeys()) { + ConfigChange change = changeEvent.getChange(key); + ConfigChangedEvent event = + new ConfigChangedEvent(key, change.getNewValue(), getChangeType(change)); + listeners.forEach(listener -> listener.process(event)); + } + } + + private ConfigChangeType getChangeType(ConfigChange change) { + if (change.getChangeType() == PropertyChangeType.DELETED) { + return ConfigChangeType.DELETED; + } + if (change.getChangeType() == PropertyChangeType.ADDED) { + return ConfigChangeType.ADDED; + } + return ConfigChangeType.MODIFIED; + } + + void addListener(ConfigListener configListener) { + this.listeners.add(configListener); + } + } } \ No newline at end of file diff --git a/config/config-apollo/src/test/java/com/alipay/sofa/rpc/dynamic/apollo/ApolloDynamicConfigManagerTest.java b/config/config-apollo/src/test/java/com/alipay/sofa/rpc/dynamic/apollo/ApolloDynamicConfigManagerTest.java index 5fbb4daf0..2b08e90d3 100644 --- a/config/config-apollo/src/test/java/com/alipay/sofa/rpc/dynamic/apollo/ApolloDynamicConfigManagerTest.java +++ b/config/config-apollo/src/test/java/com/alipay/sofa/rpc/dynamic/apollo/ApolloDynamicConfigManagerTest.java @@ -16,6 +16,8 @@ */ package com.alipay.sofa.rpc.dynamic.apollo; +import com.alipay.sofa.rpc.dynamic.DynamicConfigManager; +import com.alipay.sofa.rpc.dynamic.DynamicConfigManagerFactory; import com.alipay.sofa.rpc.dynamic.DynamicHelper; import com.alipay.sofa.rpc.log.Logger; import com.alipay.sofa.rpc.log.LoggerFactory; @@ -24,10 +26,11 @@ public class ApolloDynamicConfigManagerTest { - private final static Logger logger = LoggerFactory - .getLogger(ApolloDynamicConfigManagerTest.class); + private final static Logger logger = LoggerFactory + .getLogger(ApolloDynamicConfigManagerTest.class); - private ApolloDynamicConfigManager apolloDynamicConfigManager = new ApolloDynamicConfigManager("test"); + private DynamicConfigManager apolloDynamicConfigManager = DynamicConfigManagerFactory.getDynamicManager("test", + "apollo"); @Test public void getProviderServiceProperty() { @@ -37,17 +40,19 @@ public void getProviderServiceProperty() { @Test public void getConsumerServiceProperty() { + String result = apolloDynamicConfigManager.getConsumerServiceProperty("serviceName", "timeout"); + Assert.assertEquals(DynamicHelper.DEFAULT_DYNAMIC_VALUE, result); } @Test public void getProviderMethodProperty() { + String result = apolloDynamicConfigManager.getProviderMethodProperty("serviceName", "methodName", "timeout"); + Assert.assertEquals(DynamicHelper.DEFAULT_DYNAMIC_VALUE, result); } @Test public void getConsumerMethodProperty() { - } - - @Test - public void getServiceAuthRule() { + String result = apolloDynamicConfigManager.getConsumerMethodProperty("serviceName", "methodName", "timeout"); + Assert.assertEquals(DynamicHelper.DEFAULT_DYNAMIC_VALUE, result); } } \ No newline at end of file diff --git a/config/config-nacos/pom.xml b/config/config-nacos/pom.xml new file mode 100644 index 000000000..4e0d8740b --- /dev/null +++ b/config/config-nacos/pom.xml @@ -0,0 +1,107 @@ + + + 4.0.0 + + + com.alipay.sofa + sofa-rpc-config + ${revision} + + + sofa-rpc-config-nacos + + + + com.alipay.sofa + sofa-rpc-log-common-tools + + + com.alipay.sofa + sofa-rpc-log + + + com.alipay.sofa + sofa-rpc-api + + + + com.alibaba.nacos + nacos-client + + + org.slf4j + slf4j-log4j12 + test + + + junit + junit + test + + + + + + src/main/java + + + src/main/resources + false + + **/** + + + + src/test/java + + + src/test/resources + false + + **/** + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + ${maven.compiler.source} + ${maven.compiler.source} + ${project.build.sourceEncoding} + + + + org.apache.maven.plugins + maven-install-plugin + + ${module.install.skip} + + + + org.apache.maven.plugins + maven-deploy-plugin + + ${module.deploy.skip} + + + + org.apache.maven.plugins + maven-surefire-plugin + + ${skipTests} + + + **/*Test.java + + + once + + + + + diff --git a/config/config-nacos/src/main/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManager.java b/config/config-nacos/src/main/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManager.java new file mode 100644 index 000000000..366681927 --- /dev/null +++ b/config/config-nacos/src/main/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManager.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.dynamic.nacos; + +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.listener.AbstractSharedListener; +import com.alipay.sofa.common.config.SofaConfigs; +import com.alipay.sofa.rpc.auth.AuthRuleGroup; +import com.alipay.sofa.rpc.common.utils.StringUtils; +import com.alipay.sofa.rpc.dynamic.ConfigChangeType; +import com.alipay.sofa.rpc.dynamic.ConfigChangedEvent; +import com.alipay.sofa.rpc.dynamic.DynamicConfigKeyHelper; +import com.alipay.sofa.rpc.dynamic.DynamicConfigKeys; +import com.alipay.sofa.rpc.dynamic.DynamicConfigManager; +import com.alipay.sofa.rpc.dynamic.DynamicHelper; +import com.alipay.sofa.rpc.ext.Extension; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.config.ConfigFactory; +import com.alipay.sofa.rpc.listener.ConfigListener; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; + +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArraySet; +/** + * @author Narziss + * @version NaocsDynamicConfigManager.java, v 0.1 2024年07月26日 09:37 Narziss + */ + +@Extension(value = "nacos", override = true) +public class NacosDynamicConfigManager extends DynamicConfigManager { + + private final static Logger LOGGER = LoggerFactory.getLogger(NacosDynamicConfigManager.class); + + private static final long DEFAULT_TIMEOUT = 5000; + + private final String group; + + private final ConcurrentMap watchListenerMap = new ConcurrentHashMap<>(); + + private ConfigService configService; + + protected NacosDynamicConfigManager(String appName) { + super(appName, SofaConfigs.getOrCustomDefault(DynamicConfigKeys.CONFIG_CENTER_ADDRESS, "nacos://127.0.0.1:8848")); + group = appName; + Properties nacosConfig = new Properties(); + nacosConfig.put(PropertyKeyConst.SERVER_ADDR, getDynamicUrl().getAddress()); + nacosConfig.putAll(getDynamicUrl().getParams()); + try { + configService = ConfigFactory.createConfigService(nacosConfig); + } catch (Exception e) { + LOGGER.error("Failed to create ConfigService", e); + } + } + + @Override + public void initServiceConfiguration(String service) { + // TODO 暂不支持 + } + + @Override + public void initServiceConfiguration(String service, ConfigListener listener) { + try { + String rawConfig = configService.getConfig(service, group, DEFAULT_TIMEOUT); + if (!StringUtils.isEmpty(rawConfig)) { + listener.process(new ConfigChangedEvent(service, rawConfig)); + } + } catch (Exception e) { + LOGGER.error("Failed to getConfig for key:{}, group:{}", service, group, e); + } + } + + @Override + public String getProviderServiceProperty(String service, String key) { + try { + String configValue = configService.getConfig( + DynamicConfigKeyHelper.buildProviderServiceProKey(service, key), + group, DEFAULT_TIMEOUT); + return configValue != null ? configValue : DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } catch (Exception e) { + return DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } + } + + @Override + public String getConsumerServiceProperty(String service, String key) { + try { + String configValue = configService.getConfig( + DynamicConfigKeyHelper.buildConsumerServiceProKey(service, key), + group, DEFAULT_TIMEOUT); + return configValue != null ? configValue : DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } catch (Exception e) { + return DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } + } + + @Override + public String getProviderMethodProperty(String service, String method, String key) { + try { + String configValue = configService.getConfig( + DynamicConfigKeyHelper.buildProviderMethodProKey(service, method, key), + group, DEFAULT_TIMEOUT); + return configValue != null ? configValue : DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } catch (Exception e) { + return DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } + } + + @Override + public String getConsumerMethodProperty(String service, String method, String key) { + try { + String configValue = configService.getConfig( + DynamicConfigKeyHelper.buildConsumerMethodProKey(service, method, key), + group, DEFAULT_TIMEOUT); + return configValue != null ? configValue : DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } catch (Exception e) { + return DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } + } + + @Override + public AuthRuleGroup getServiceAuthRule(String service) { + //TODO 暂不支持 + return null; + } + + @Override + public void addListener(String key, ConfigListener listener) { + NacosConfigListener nacosConfigListener = watchListenerMap.computeIfAbsent( + key, k -> createTargetListener(key)); + nacosConfigListener.addListener(listener); + try { + configService.addListener(key, group, nacosConfigListener); + } catch (Exception e) { + LOGGER.error("Failed to add listener for key:{}, group:{}", key, group, e); + } + } + + private NacosConfigListener createTargetListener(String key) { + NacosConfigListener configListener = new NacosConfigListener(); + configListener.fillContext(key, group); + return configListener; + } + + public static class NacosConfigListener extends AbstractSharedListener { + + private final Set listeners = new CopyOnWriteArraySet<>(); + + /** + * cache data to store old value + */ + private final Map cacheData = new ConcurrentHashMap<>(); + + /** + * receive config change event + * + * @param dataId data ID + * @param group group + * @param configInfo content + */ + @Override + public void innerReceive(String dataId, String group, String configInfo) { + String oldValue = cacheData.get(dataId); + ConfigChangedEvent event = + new ConfigChangedEvent(dataId, configInfo, getChangeType(configInfo, oldValue)); + if (configInfo == null) { + cacheData.remove(dataId); + } else { + cacheData.put(dataId, configInfo); + } + listeners.forEach(listener -> listener.process(event)); + } + + void addListener(ConfigListener configListener) { + + this.listeners.add(configListener); + } + + private ConfigChangeType getChangeType(String configInfo, String oldValue) { + if (StringUtils.isBlank(configInfo)) { + return ConfigChangeType.DELETED; + } + if (StringUtils.isBlank(oldValue)) { + return ConfigChangeType.ADDED; + } + return ConfigChangeType.MODIFIED; + } + } +} \ No newline at end of file diff --git a/config/config-nacos/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.dynamic.DynamicConfigManager b/config/config-nacos/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.dynamic.DynamicConfigManager new file mode 100644 index 000000000..c055a88ae --- /dev/null +++ b/config/config-nacos/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.dynamic.DynamicConfigManager @@ -0,0 +1,2 @@ +nacos=com.alipay.sofa.rpc.dynamic.nacos.NacosDynamicConfigManager + diff --git a/config/config-nacos/src/test/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManagerTest.java b/config/config-nacos/src/test/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManagerTest.java new file mode 100644 index 000000000..f880cdf7b --- /dev/null +++ b/config/config-nacos/src/test/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManagerTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.dynamic.nacos; + +import com.alipay.sofa.rpc.dynamic.DynamicConfigManager; +import com.alipay.sofa.rpc.dynamic.DynamicConfigManagerFactory; +import com.alipay.sofa.rpc.dynamic.DynamicHelper; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; +import org.junit.Assert; +import org.junit.Test; + +public class NacosDynamicConfigManagerTest { + + private final static Logger logger = LoggerFactory + .getLogger(NacosDynamicConfigManagerTest.class); + + private DynamicConfigManager nacosDynamicConfigManager = DynamicConfigManagerFactory.getDynamicManager("test", + "nacos"); + + @Test + public void getProviderServiceProperty() { + String result = nacosDynamicConfigManager.getProviderServiceProperty("serviceName", "timeout"); + Assert.assertEquals(DynamicHelper.DEFAULT_DYNAMIC_VALUE, result); + } + + @Test + public void getConsumerServiceProperty() { + String result = nacosDynamicConfigManager.getConsumerServiceProperty("serviceName", "timeout"); + Assert.assertEquals(DynamicHelper.DEFAULT_DYNAMIC_VALUE, result); + } + + @Test + public void getProviderMethodProperty() { + String result = nacosDynamicConfigManager.getProviderMethodProperty("serviceName", "methodName", "timeout"); + Assert.assertEquals(DynamicHelper.DEFAULT_DYNAMIC_VALUE, result); + } + + @Test + public void getConsumerMethodProperty() { + String result = nacosDynamicConfigManager.getConsumerMethodProperty("serviceName", "methodName", "timeout"); + Assert.assertEquals(DynamicHelper.DEFAULT_DYNAMIC_VALUE, result); + } +} \ No newline at end of file diff --git a/config/config-nacos/src/test/resources/log4j.xml b/config/config-nacos/src/test/resources/log4j.xml new file mode 100644 index 000000000..e95634f16 --- /dev/null +++ b/config/config-nacos/src/test/resources/log4j.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/config/config-zk/pom.xml b/config/config-zk/pom.xml new file mode 100644 index 000000000..8701b83b0 --- /dev/null +++ b/config/config-zk/pom.xml @@ -0,0 +1,117 @@ + + + 4.0.0 + + + com.alipay.sofa + sofa-rpc-config + ${revision} + + + sofa-rpc-config-zk + + + + com.alipay.sofa + sofa-rpc-log-common-tools + + + com.alipay.sofa + sofa-rpc-log + + + com.alipay.sofa + sofa-rpc-api + + + + org.apache.curator + curator-framework + + + org.apache.curator + curator-x-discovery + + + org.apache.zookeeper + zookeeper + + + + org.slf4j + slf4j-log4j12 + test + + + junit + junit + test + + + + + + + src/main/java + + + src/main/resources + false + + **/** + + + + src/test/java + + + src/test/resources + false + + **/** + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + ${maven.compiler.source} + ${maven.compiler.source} + ${project.build.sourceEncoding} + + + + org.apache.maven.plugins + maven-install-plugin + + ${module.install.skip} + + + + org.apache.maven.plugins + maven-deploy-plugin + + ${module.deploy.skip} + + + + org.apache.maven.plugins + maven-surefire-plugin + + ${skipTests} + + + **/*Test.java + + + once + + + + + diff --git a/config/config-zk/src/main/java/com/alipay/sofa/rpc/dynamic/zk/ZookeeperDynamicConfigManager.java b/config/config-zk/src/main/java/com/alipay/sofa/rpc/dynamic/zk/ZookeeperDynamicConfigManager.java new file mode 100644 index 000000000..b1539eeb0 --- /dev/null +++ b/config/config-zk/src/main/java/com/alipay/sofa/rpc/dynamic/zk/ZookeeperDynamicConfigManager.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.dynamic.zk; + +import com.alipay.sofa.common.config.SofaConfigs; +import com.alipay.sofa.rpc.auth.AuthRuleGroup; +import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.common.utils.StringUtils; +import com.alipay.sofa.rpc.dynamic.ConfigChangeType; +import com.alipay.sofa.rpc.dynamic.ConfigChangedEvent; +import com.alipay.sofa.rpc.dynamic.DynamicConfigKeyHelper; +import com.alipay.sofa.rpc.dynamic.DynamicConfigKeys; +import com.alipay.sofa.rpc.dynamic.DynamicConfigManager; +import com.alipay.sofa.rpc.dynamic.DynamicHelper; +import com.alipay.sofa.rpc.ext.Extension; +import com.alipay.sofa.rpc.listener.ConfigListener; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.*; +import org.apache.curator.retry.ExponentialBackoffRetry; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArraySet; + +import static com.alipay.sofa.common.config.SofaConfigs.getOrDefault; +import static com.alipay.sofa.rpc.common.utils.StringUtils.CONTEXT_SEP; + +/** + * @author Narziss + * @version ZookeeperDynamicConfigManager.java, v 0.1 2024年07月20日 09:23 Narziss + */ + +@Extension(value = "zookeeper", override = true) +public class ZookeeperDynamicConfigManager extends DynamicConfigManager { + + private final static Logger LOGGER = LoggerFactory + .getLogger(ZookeeperDynamicConfigManager.class); + + private final CuratorFramework zkClient; + + private final String rootPath; + private final ConcurrentMap watchListenerMap = new ConcurrentHashMap<>(); + private final ConcurrentMap configMap = new ConcurrentHashMap<>(); + + protected ZookeeperDynamicConfigManager(String appName) { + super(appName, SofaConfigs.getOrCustomDefault(DynamicConfigKeys.CONFIG_CENTER_ADDRESS, "zookeeper://127.0.0.1:2181")); + rootPath = CONTEXT_SEP + DynamicConfigKeys.CONFIG_NODE + CONTEXT_SEP + appName; + zkClient = CuratorFrameworkFactory.builder() + .connectString(getDynamicUrl().getAddress()) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)) + .namespace(DynamicConfigKeys.DEFAULT_NAMESPACE) + .build(); + zkClient.start(); + + if (!getOrDefault(DynamicConfigKeys.DYNAMIC_REFRESH_ENABLE)) { + PathChildrenCache cache = new PathChildrenCache(zkClient, rootPath, true); + cache.getListenable().addListener(new PathChildrenCacheListener() { + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { + switch (event.getType()) { + case CHILD_ADDED: + case CHILD_UPDATED: + String key = event.getData().getPath().substring(rootPath.length() + 1); + String value = new String(event.getData().getData()); + configMap.put(key, value); + LOGGER.info("Receive zookeeper event: " + "type=[" + event.getType() + "] key=[" + key + "] value=[" + value + "]"); + break; + case CHILD_REMOVED: + key = event.getData().getPath().substring(rootPath.length() + 1); + configMap.remove(key); + LOGGER.info("Receive zookeeper event: " + "type=[" + event.getType() + "] key=[" + key + "]"); + break; + default: + break; + } + } + }); + try { + cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); + } catch (Exception e) { + LOGGER.error("setupPathChildrenCache error", e); + } + } + } + + @Override + public void initServiceConfiguration(String service) { + // TODO 暂不支持 + } + + @Override + public void initServiceConfiguration(String service, ConfigListener listener) { + try { + String path = rootPath + CONTEXT_SEP + service; + if (zkClient.checkExists().forPath(path) != null) { + byte[] bytes = zkClient.getData().forPath(rootPath + CONTEXT_SEP + service); + String rawConfig = new String(bytes, RpcConstants.DEFAULT_CHARSET); + if (!StringUtils.isEmpty(rawConfig)) { + listener.process(new ConfigChangedEvent(service, rawConfig)); + } + } + } catch (Exception e) { + LOGGER.error("Failed to init service configuration for service: " + service, e); + } + + } + + @Override + public String getProviderServiceProperty(String service, String key) { + try { + String configValue = configMap.get(DynamicConfigKeyHelper.buildProviderServiceProKey(service, key)); + return configValue != null ? configValue : DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } catch (Exception e) { + return DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } + } + + @Override + public String getConsumerServiceProperty(String service, String key) { + try { + String configValue = configMap.get(DynamicConfigKeyHelper.buildConsumerServiceProKey(service, key)); + return configValue != null ? configValue : DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } catch (Exception e) { + return DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } + } + + @Override + public String getProviderMethodProperty(String service, String method, String key) { + try { + String configValue = configMap.get(DynamicConfigKeyHelper.buildProviderMethodProKey(service, method, key)); + return configValue != null ? configValue : DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } catch (Exception e) { + return DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } + } + + @Override + public String getConsumerMethodProperty(String service, String method, String key) { + try { + String configValue = configMap.get(DynamicConfigKeyHelper.buildConsumerMethodProKey(service, method, key)); + return configValue != null ? configValue : DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } catch (Exception e) { + return DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } + + } + + @Override + public AuthRuleGroup getServiceAuthRule(String service) { + //TODO 暂不支持 + return null; + } + + @Override + public void addListener(String key, ConfigListener listener) { + String pathKey = rootPath + CONTEXT_SEP + key; + + ZookeeperConfigListener zookeeperConfigListener = watchListenerMap.computeIfAbsent( + key, k -> createTargetListener(pathKey)); + + zookeeperConfigListener.addListener(listener); + } + + private ZookeeperConfigListener createTargetListener(String pathKey) { + ZookeeperConfigListener configListener = new ZookeeperConfigListener(pathKey); + return configListener; + } + + public class ZookeeperConfigListener implements NodeCacheListener { + + private final String pathKey; + private final Set listeners = new CopyOnWriteArraySet<>(); + private final NodeCache nodeCache; + + public ZookeeperConfigListener(String pathKey) { + this.pathKey = pathKey; + this.nodeCache = new NodeCache(zkClient, pathKey); + nodeCache.getListenable().addListener(this); + try { + nodeCache.start(); + } catch (Exception e) { + LOGGER.error("Failed to add listener for path:{}", pathKey, e); + } + } + + public void addListener(ConfigListener configListener) { + listeners.add(configListener); + } + + @Override + public void nodeChanged() throws Exception { + ChildData childData = nodeCache.getCurrentData(); + String content = null; + ConfigChangeType changeType; + if (childData == null) { + changeType = ConfigChangeType.DELETED; + + } else if (childData.getStat().getVersion() == 0) { + content = new String(childData.getData(), RpcConstants.DEFAULT_CHARSET); + changeType = ConfigChangeType.ADDED; + } else { + content = new String(childData.getData(), RpcConstants.DEFAULT_CHARSET); + changeType = ConfigChangeType.MODIFIED; + } + ConfigChangedEvent configChangeEvent = new ConfigChangedEvent(pathKey, (String) content, changeType); + listeners.forEach(listener -> listener.process(configChangeEvent)); + + } + } + + +} \ No newline at end of file diff --git a/config/config-zk/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.dynamic.DynamicConfigManager b/config/config-zk/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.dynamic.DynamicConfigManager new file mode 100644 index 000000000..5f9a8243e --- /dev/null +++ b/config/config-zk/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.dynamic.DynamicConfigManager @@ -0,0 +1 @@ +zookeeper=com.alipay.sofa.rpc.dynamic.zk.ZookeeperDynamicConfigManager diff --git a/config/config-zk/src/test/java/com/alipay/sofa/rpc/dynamic/zk/ZookeeperDynamicConfigManagerTest.java b/config/config-zk/src/test/java/com/alipay/sofa/rpc/dynamic/zk/ZookeeperDynamicConfigManagerTest.java new file mode 100644 index 000000000..9ccc3bf9f --- /dev/null +++ b/config/config-zk/src/test/java/com/alipay/sofa/rpc/dynamic/zk/ZookeeperDynamicConfigManagerTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.dynamic.zk; + +import com.alipay.sofa.rpc.dynamic.DynamicConfigManager; +import com.alipay.sofa.rpc.dynamic.DynamicConfigManagerFactory; +import com.alipay.sofa.rpc.dynamic.DynamicHelper; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; +import org.junit.Assert; +import org.junit.Test; + +public class ZookeeperDynamicConfigManagerTest { + + private final static Logger logger = LoggerFactory + .getLogger(ZookeeperDynamicConfigManager.class); + + private DynamicConfigManager zookeeperDynamicConfigManager = DynamicConfigManagerFactory.getDynamicManager( + "test", "zookeeper"); + + @Test + public void getProviderServiceProperty() { + String result = zookeeperDynamicConfigManager.getProviderServiceProperty("serviceName", "timeout"); + Assert.assertEquals(DynamicHelper.DEFAULT_DYNAMIC_VALUE, result); + } + + @Test + public void getConsumerServiceProperty() { + String result = zookeeperDynamicConfigManager.getConsumerServiceProperty("serviceName", "timeout"); + Assert.assertEquals(DynamicHelper.DEFAULT_DYNAMIC_VALUE, result); + } + + @Test + public void getProviderMethodProperty() { + String result = zookeeperDynamicConfigManager.getProviderMethodProperty("serviceName", "methodName", "timeout"); + Assert.assertEquals(DynamicHelper.DEFAULT_DYNAMIC_VALUE, result); + } + + @Test + public void getConsumerMethodProperty() { + String result = zookeeperDynamicConfigManager.getConsumerMethodProperty("serviceName", "methodName", "timeout"); + Assert.assertEquals(DynamicHelper.DEFAULT_DYNAMIC_VALUE, result); + } +} \ No newline at end of file diff --git a/config/config-zk/src/test/resources/log4j.xml b/config/config-zk/src/test/resources/log4j.xml new file mode 100644 index 000000000..e95634f16 --- /dev/null +++ b/config/config-zk/src/test/resources/log4j.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/config/pom.xml b/config/pom.xml index 7d0caacb8..2e380e67e 100644 --- a/config/pom.xml +++ b/config/pom.xml @@ -15,6 +15,8 @@ config-apollo + config-zk + config-nacos diff --git a/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/AbstractCluster.java b/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/AbstractCluster.java index 2db6a6b1f..81d4cfa40 100644 --- a/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/AbstractCluster.java +++ b/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/AbstractCluster.java @@ -689,7 +689,7 @@ private SofaResponse buildEmptyResponse(SofaRequest request) { * @return 调用超时 */ private int resolveTimeout(SofaRequest request, ConsumerConfig consumerConfig, ProviderInfo providerInfo) { - // 动态配置优先 + // 请求级别动态配置优先 final String dynamicAlias = consumerConfig.getParameter(DynamicConfigKeys.DYNAMIC_ALIAS); if (StringUtils.isNotBlank(dynamicAlias)) { String dynamicTimeout = null; @@ -707,6 +707,7 @@ private int resolveTimeout(SofaRequest request, ConsumerConfig consumerConfig, P return Integer.parseInt(dynamicTimeout); } } + // 先去调用级别配置 Integer timeout = request.getTimeout(); if (timeout == null || timeout <= 0) { diff --git a/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/lb/AutoLoadBalancer.java b/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/lb/AutoLoadBalancer.java index 9bf01c75e..27d92a90d 100644 --- a/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/lb/AutoLoadBalancer.java +++ b/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/lb/AutoLoadBalancer.java @@ -53,7 +53,7 @@ public AutoLoadBalancer(ConsumerBootstrap consumerBootstrap) { @Override protected ProviderInfo doSelect(SofaRequest request, List providerInfos) { - // 动态配置优先 + // 请求级别动态配置优先 final String dynamicAlias = consumerConfig.getParameter(DynamicConfigKeys.DYNAMIC_ALIAS); if (StringUtils.isNotBlank(dynamicAlias)) { String dynamicLoadBalancer = null; diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/config/AbstractInterfaceConfig.java b/core/api/src/main/java/com/alipay/sofa/rpc/config/AbstractInterfaceConfig.java index acc4a74e1..f3a236e09 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/config/AbstractInterfaceConfig.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/config/AbstractInterfaceConfig.java @@ -215,6 +215,11 @@ public abstract class AbstractInterfaceConfig configValueCache = null; + /** + * 动态配置的方法名称和方法参数配置的map + */ + protected transient volatile Map dynamicConfigValueCache = new ConcurrentHashMap<>(); + /** * 代理接口类,和T对应,主要针对泛化调用 */ @@ -703,6 +708,15 @@ public Map getConfigValueCache() { return configValueCache; } + /** + * Gets dynamic config value cache. + * + * @return the dynamic config value cache + */ + public Map getDynamicConfigValueCache() { + return dynamicConfigValueCache; + } + /** * Sets config listener. * @@ -930,7 +944,7 @@ public boolean updateAttribute(String property, String newValueStr, boolean over int index = methodAndP.indexOf(RpcConstants.HIDE_KEY_PREFIX); if (index <= 0) { throw ExceptionUtils.buildRuntime(property, newValueStr, - "Unknown update attribute key!"); + "Unknown update attribute key!"); } String methodName = methodAndP.substring(0, index); String methodProperty = methodAndP.substring(index + 1); @@ -940,6 +954,9 @@ public boolean updateAttribute(String property, String newValueStr, boolean over // 拿到旧的值 Object oldValue = null; Object newValue = CompatibleTypeUtils.convert(newValueStr, propertyClazz); + if (dynamicConfigValueCache.containsKey(property)) { + dynamicConfigValueCache.put(property, newValue); + } if (methodConfig == null) { methodConfig = new MethodConfig(); methodConfig.setName(methodName); @@ -962,7 +979,7 @@ public boolean updateAttribute(String property, String newValueStr, boolean over BeanUtils.setProperty(methodConfig, methodProperty, propertyClazz, newValue);// 覆盖属性 if (LOGGER.isInfoEnabled()) { LOGGER.info("Property \"" + methodName + "." + methodProperty + "\" changed from {} to {}", - oldValue, newValueStr); + oldValue, newValueStr); } } } else { // 接口级配置 例如timeout @@ -972,6 +989,9 @@ public boolean updateAttribute(String property, String newValueStr, boolean over // 拿到旧的值 Object oldValue = BeanUtils.getProperty(this, property, propertyClazz); Object newValue = CompatibleTypeUtils.convert(newValueStr, propertyClazz); + if (dynamicConfigValueCache.containsKey(property)) { + dynamicConfigValueCache.put(property, newValue); + } if (oldValue == null) { if (newValueStr != null) { changed = true; @@ -991,7 +1011,7 @@ public boolean updateAttribute(String property, String newValueStr, boolean over throw e; } catch (Exception e) { throw new SofaRpcRuntimeException(LogCodes.getLog(LogCodes.ERROR_UPDATE_ATTRIBUTE, property, newValueStr), - e); + e); } } @@ -1016,11 +1036,17 @@ public Object getMethodConfigValue(String methodName, String configKey, Object d * @return 配置值 method config value */ public Object getMethodConfigValue(String methodName, String configKey) { - if (configValueCache == null) { - return null; - } String key = buildmkey(methodName, configKey); - return configValueCache.get(key); + Object value = null; + if (dynamicConfigValueCache != null) { + value = dynamicConfigValueCache.get(key); + } + if (value == null) { + if (configValueCache != null) { + value = configValueCache.get(key); + } + } + return value; } /** diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/ConfigChangeType.java b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/ConfigChangeType.java new file mode 100644 index 000000000..99f87d457 --- /dev/null +++ b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/ConfigChangeType.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.dynamic; + +/** + * @author Narziss + * @version ConfigChangeType.java, v 0.1 2024年09月15日 20:20 Narziss + */ + +public enum ConfigChangeType { + /** + * A config is created. + */ + ADDED, + + /** + * A config is updated. + */ + MODIFIED, + + /** + * A config is deleted. + */ + DELETED +} diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/ConfigChangedEvent.java b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/ConfigChangedEvent.java new file mode 100644 index 000000000..220f2889d --- /dev/null +++ b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/ConfigChangedEvent.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.dynamic; + +import com.alipay.sofa.rpc.common.utils.StringUtils; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; + +import java.util.EventObject; +import java.util.HashMap; +import java.util.Map; + +/** + * @author Narziss + * @version ConfigChangedEvent.java, v 0.1 2024年09月15日 20:12 Narziss + */ + +public class ConfigChangedEvent extends EventObject { + + private final static Logger LOGGER = LoggerFactory.getLogger(ConfigChangedEvent.class); + + private final String key; + + private final String content; + + private final ConfigChangeType changeType; + + private final Map dynamicValueMap = new HashMap<>(); + + public ConfigChangedEvent(String key, String content) { + this(key, content, ConfigChangeType.MODIFIED); + } + + public ConfigChangedEvent(String key, String content, ConfigChangeType changeType) { + super(key); + this.key = key; + this.content = content; + this.changeType = changeType; + if (StringUtils.isNotBlank(content)) { + parseConfigurationLines(content); + } + } + + private void parseConfigurationLines(String content) { + String[] lines = content.split(System.lineSeparator()); + for (String line : lines) { + String[] keyValue = line.split("=", 2); + if (keyValue.length == 2) { + String mapKey = keyValue[0].trim(); + String mapValue = keyValue[1].trim(); + dynamicValueMap.put(mapKey, mapValue); + } else { + LOGGER.warn("Malformed configuration line: {}", line); + } + } + } + + public String getKey() { + return key; + } + + public String getContent() { + return content; + } + + public ConfigChangeType getChangeType() { + return changeType; + } + + public Map getDynamicValueMap() { + return dynamicValueMap; + } + +} diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigKeys.java b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigKeys.java index d8c49dcec..44bb51901 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigKeys.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigKeys.java @@ -16,10 +16,33 @@ */ package com.alipay.sofa.rpc.dynamic; +import com.alipay.sofa.common.config.ConfigKey; + /** * @author bystander * @version : DynamicConfigKeys.java, v 0.1 2019年04月17日 21:51 bystander Exp $ */ public class DynamicConfigKeys { - public static final String DYNAMIC_ALIAS = "dynamicAlias"; + public static final String DYNAMIC_ALIAS = "dynamicAlias"; + + public static final String CONFIG_NODE = "config"; + + public static final String DEFAULT_NAMESPACE = "sofa-rpc"; + + public static ConfigKey CONFIG_CENTER_ADDRESS = ConfigKey + .build( + "sofa.rpc.config.center.address", + " ", + false, + "The url of the dynamic configuration.", + new String[] { "sofa_rpc_config_center_address" }); + + public static ConfigKey DYNAMIC_REFRESH_ENABLE = ConfigKey + .build( + "sofa.rpc.config.dynamic.refresh.enable", + false, + false, + "Switch for dynamic configuration refresh.", + new String[] { "sofa_rpc_config_dynamic_refresh_enable" }); + } \ No newline at end of file diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManager.java b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManager.java index f44818df8..8c7ac5fa6 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManager.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManager.java @@ -17,22 +17,40 @@ package com.alipay.sofa.rpc.dynamic; import com.alipay.sofa.rpc.auth.AuthRuleGroup; +import com.alipay.sofa.rpc.common.utils.StringUtils; import com.alipay.sofa.rpc.ext.Extensible; +import com.alipay.sofa.rpc.listener.ConfigListener; /** - * * @author bystander * @version : DynamicManager.java, v 0.1 2019年04月12日 11:35 bystander Exp $ */ @Extensible(singleton = true) public abstract class DynamicConfigManager { - private String appName; + private String appName; + + private DynamicUrl dynamicUrl; protected DynamicConfigManager(String appName) { this.appName = appName; } + protected DynamicConfigManager(String appName, String configCenterAddress) { + this.appName = appName; + if (StringUtils.isNotBlank(configCenterAddress)) { + this.dynamicUrl = new DynamicUrl(configCenterAddress); + } + } + + protected String getAppName() { + return appName; + } + + protected DynamicUrl getDynamicUrl() { + return dynamicUrl; + } + /** * Init service's governance related configuration. * Such as auth rules、lb rules @@ -41,11 +59,21 @@ protected DynamicConfigManager(String appName) { */ public abstract void initServiceConfiguration(String service); + /** + * Init service's governance related configuration. + * Such as auth rules、lb rules + * + * @param service target service + * @param listener config listener + */ + public void initServiceConfiguration(String service, ConfigListener listener) { + } + /** * Get provider service related property. * * @param service target service - * @param key property key + * @param key property key * @return property value */ public abstract String getProviderServiceProperty(String service, String key); @@ -54,7 +82,7 @@ protected DynamicConfigManager(String appName) { * Get consumer service related property. * * @param service target service - * @param key property key + * @param key property key * @return property value */ public abstract String getConsumerServiceProperty(String service, String key); @@ -63,8 +91,8 @@ protected DynamicConfigManager(String appName) { * Get provider method related property. * * @param service target service - * @param method target method - * @param key property key + * @param method target method + * @param key property key * @return property value */ public abstract String getProviderMethodProperty(String service, String method, String key); @@ -73,8 +101,8 @@ protected DynamicConfigManager(String appName) { * Get consumer method related property. * * @param service target service - * @param method target method - * @param key property key + * @param method target method + * @param key property key * @return property value */ public abstract String getConsumerMethodProperty(String service, String method, String key); @@ -86,4 +114,14 @@ protected DynamicConfigManager(String appName) { * @return auth rules */ public abstract AuthRuleGroup getServiceAuthRule(String service); + + /** + * Add config listener. + * + * @param key config key + * @param listener config listener + */ + public void addListener(String key, ConfigListener listener) { + } + } \ No newline at end of file diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManagerFactory.java b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManagerFactory.java index d3cee7245..5b29646f2 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManagerFactory.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManagerFactory.java @@ -51,7 +51,7 @@ public class DynamicConfigManagerFactory { .getLogger(DynamicConfigManagerFactory.class); /** - * 得到动态配置管理 + * 得到动态配置管理器 * * @param alias 别名 * @return DynamicManager 实现 diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicUrl.java b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicUrl.java new file mode 100644 index 000000000..b83ffc3d7 --- /dev/null +++ b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicUrl.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.dynamic; + +import java.util.Map; +import java.util.HashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * @author Narziss + * @version DynamicUrl.java, v 0.1 2024年10月28日 21:23 Narziss + */ +public class DynamicUrl { + + private final String originalUrl; + private final String protocol; + private final String address; + private final String host; + private final int port; + private final String path; + private final Map params = new HashMap<>(); + + /** + * @param configCenterAddress example: apollo://127.0.0.1:8080/config?appId=xxx&cluster=yyy + */ + public DynamicUrl(String configCenterAddress) { + this.originalUrl = configCenterAddress; + // 正则表达式解析协议、主机、端口、路径和参数,其中路径和参数是可选的 + String regex = "^(\\w+)://([^:/]+):(\\d+)(/[^?]*)?(\\?.*)?$"; + Matcher matcher = Pattern.compile(regex).matcher(configCenterAddress); + if (matcher.find()) { + this.protocol = matcher.group(1); + this.host = matcher.group(2); + this.port = Integer.parseInt(matcher.group(3)); + // 判断路径是否为空或者为 "/" + this.path = (matcher.group(4) != null && !matcher.group(4).equals("/")) ? matcher.group(4) : ""; + this.address = this.host + ":" + this.port + this.path; + if (matcher.group(5) != null) { + parseQueryParams(matcher.group(5).substring(1)); + } + } else { + throw new IllegalArgumentException("Invalid URL format"); + } + } + + private void parseQueryParams(String query) { + String[] paramPairs = query.split("&"); + for (String paramPair : paramPairs) { + String[] keyValue = paramPair.split("=", 2); + if (keyValue.length == 2) { + params.put(keyValue[0], keyValue[1]); + } + } + } + + public String getOriginalUrl() { + return originalUrl; + } + + public String getProtocol() { + return protocol; + } + + public String getAddress() { + return address; + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public String getPath() { + return path; + } + + public Map getParams() { + return params; + } + + public String getParam(String key) { + return params.get(key); + } +} diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/listener/ConfigListener.java b/core/api/src/main/java/com/alipay/sofa/rpc/listener/ConfigListener.java index 7fb03ac2b..0c7685350 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/listener/ConfigListener.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/listener/ConfigListener.java @@ -16,6 +16,8 @@ */ package com.alipay.sofa.rpc.listener; +import com.alipay.sofa.rpc.dynamic.ConfigChangedEvent; + import java.util.Map; /** @@ -25,6 +27,15 @@ */ public interface ConfigListener { + /** + * 处理配置变更事件 + * + * @param event 配置变更事件 + */ + default void process(ConfigChangedEvent event){ + // do nothing + } + /** * 配置发生变化,例如 * diff --git a/core/api/src/test/java/com/alipay/sofa/rpc/dynamic/DynamicUrlTest.java b/core/api/src/test/java/com/alipay/sofa/rpc/dynamic/DynamicUrlTest.java new file mode 100644 index 000000000..86f2cd187 --- /dev/null +++ b/core/api/src/test/java/com/alipay/sofa/rpc/dynamic/DynamicUrlTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.dynamic; + +import org.junit.Test; +import org.junit.Assert; + +/** + * @author Narziss + * @version DynamicUrl.java, v 0.1 2024年10月28日 21:40 Narziss + */ +public class DynamicUrlTest { + + @Test + public void testWithPathAndParams() { + DynamicUrl dynamicUrl = new DynamicUrl("apollo://127.0.0.1:8080/config?appId=xxx&cluster=yyy"); + Assert.assertEquals("apollo", dynamicUrl.getProtocol()); + Assert.assertEquals("127.0.0.1", dynamicUrl.getHost()); + Assert.assertEquals(8080, dynamicUrl.getPort()); + Assert.assertEquals("/config", dynamicUrl.getPath()); + Assert.assertEquals("127.0.0.1:8080/config", dynamicUrl.getAddress()); + Assert.assertNotNull(dynamicUrl.getParams()); + Assert.assertEquals("xxx", dynamicUrl.getParams().get("appId")); + Assert.assertEquals("yyy", dynamicUrl.getParams().get("cluster")); + } + + @Test + public void testWithSlashAndParams() { + DynamicUrl dynamicUrl = new DynamicUrl("apollo://127.0.0.1:8080/?appId=xxx&cluster=yyy"); + Assert.assertEquals("apollo", dynamicUrl.getProtocol()); + Assert.assertEquals("127.0.0.1", dynamicUrl.getHost()); + Assert.assertEquals(8080, dynamicUrl.getPort()); + Assert.assertEquals("", dynamicUrl.getPath());// 如果路径为空,返回空字符串 + Assert.assertEquals("127.0.0.1:8080", dynamicUrl.getAddress()); + Assert.assertNotNull(dynamicUrl.getParams()); + Assert.assertEquals("xxx", dynamicUrl.getParams().get("appId")); + Assert.assertEquals("yyy", dynamicUrl.getParams().get("cluster")); + } + + @Test + public void testWithParams() { + DynamicUrl dynamicUrl = new DynamicUrl("apollo://127.0.0.1:8080?appId=xxx&cluster=yyy"); + Assert.assertEquals("apollo", dynamicUrl.getProtocol()); + Assert.assertEquals("127.0.0.1", dynamicUrl.getHost()); + Assert.assertEquals(8080, dynamicUrl.getPort()); + Assert.assertEquals("", dynamicUrl.getPath()); + Assert.assertEquals("127.0.0.1:8080", dynamicUrl.getAddress()); + Assert.assertNotNull(dynamicUrl.getParams()); + Assert.assertEquals("xxx", dynamicUrl.getParams().get("appId")); + Assert.assertEquals("yyy", dynamicUrl.getParams().get("cluster")); + } + + @Test + public void testOnlyHostAndPort() { + DynamicUrl dynamicUrl = new DynamicUrl("apollo://127.0.0.1:8080"); + Assert.assertEquals("apollo", dynamicUrl.getProtocol()); + Assert.assertEquals("127.0.0.1", dynamicUrl.getHost()); + Assert.assertEquals(8080, dynamicUrl.getPort()); + Assert.assertEquals("", dynamicUrl.getPath()); + Assert.assertEquals("127.0.0.1:8080", dynamicUrl.getAddress()); + System.out.println(dynamicUrl.getParams()); + Assert.assertNotNull(dynamicUrl.getParams()); + Assert.assertTrue(dynamicUrl.getParams().isEmpty()); + } +} diff --git a/test/test-integration/pom.xml b/test/test-integration/pom.xml index 87aef8ddc..6e3d254a3 100644 --- a/test/test-integration/pom.xml +++ b/test/test-integration/pom.xml @@ -201,7 +201,7 @@ com.alipay.sofa - sofa-rpc-config-apollo + sofa-rpc-config-nacos ${project.parent.version} test @@ -261,6 +261,24 @@ junit test + + com.alipay.sofa + sofa-rpc-config-apollo + ${project.parent.version} + test + + + com.alipay.sofa + sofa-rpc-config-zk + ${project.parent.version} + test + + + org.apache.curator + curator-test + 4.3.0 + test + diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/ApolloDynamicConfigTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/ApolloDynamicConfigTest.java new file mode 100644 index 000000000..4a9bbf20f --- /dev/null +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/ApolloDynamicConfigTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.test.config; + +import com.alipay.sofa.rpc.config.ApplicationConfig; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.dynamic.DynamicConfigKeys; +import com.alipay.sofa.rpc.dynamic.DynamicConfigManager; +import com.alipay.sofa.rpc.dynamic.DynamicConfigManagerFactory; +import com.alipay.sofa.rpc.dynamic.apollo.ApolloDynamicConfigManager; +import com.alipay.sofa.rpc.test.HelloService; +import com.ctrip.framework.apollo.enums.PropertyChangeType; +import com.ctrip.framework.apollo.model.ConfigChange; +import com.ctrip.framework.apollo.model.ConfigChangeEvent; +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; + +/** + * @author Narziss + * @version ApolloDynamicConfigTest.java, v 0.1 2024年09月28日 10:46 Narziss + */ +public class ApolloDynamicConfigTest { + + @Test + public void testApolloDynamicConfig() throws Exception { + System.setProperty(DynamicConfigKeys.DYNAMIC_REFRESH_ENABLE.getKey(), "true"); + System.setProperty(DynamicConfigKeys.CONFIG_CENTER_ADDRESS.getKey(), "apollo://127.0.0.1:8080?appId=demo"); + ApplicationConfig clientApplication = new ApplicationConfig(); + clientApplication.setAppName("demo"); + + ConsumerConfig consumerConfig = new ConsumerConfig() + .setInterfaceId(HelloService.class.getName()) + .setProtocol("bolt") + .setDirectUrl("bolt://127.0.0.1:12200") + .setConnectTimeout(10 * 1000) + .setApplication(clientApplication); + + consumerConfig.refer(); + + // 获取接口对应的动态配置监听器 + DynamicConfigManager dynamicConfigManager = DynamicConfigManagerFactory.getDynamicManager + (clientApplication.getAppName(), "apollo"); + Field field = ApolloDynamicConfigManager.class.getDeclaredField("watchListenerMap"); + field.setAccessible(true); + Map watchListenerMap = (Map) field + .get(dynamicConfigManager); + ApolloDynamicConfigManager.ApolloListener apolloConfigListener = watchListenerMap.get(consumerConfig + .getInterfaceId()); + + // 测试配置新增 + String configValue = "timeout=5000"+System.lineSeparator(); + ConfigChange configChange = new ConfigChange("application", consumerConfig.getInterfaceId(), null, configValue, PropertyChangeType.ADDED); + Map changes = new HashMap<>(); + changes.put(configChange.getPropertyName(), configChange); + ConfigChangeEvent event = new ConfigChangeEvent("application", changes); + apolloConfigListener.onChange(event); + Assert.assertEquals(5000, consumerConfig.getMethodTimeout("sayHello")); + + // 测试配置修改 + String oldValue = configValue; + configValue = "timeout=5000"+System.lineSeparator()+".sayHello.timeout=6000"; + configChange = new ConfigChange("application", consumerConfig.getInterfaceId(), oldValue, configValue, PropertyChangeType.MODIFIED); + changes = new HashMap<>(); + changes.put(configChange.getPropertyName(), configChange); + event = new ConfigChangeEvent("application", changes); + apolloConfigListener.onChange(event); + Assert.assertEquals(6000, consumerConfig.getMethodTimeout("sayHello")); + + // 测试配置删除 + configChange = new ConfigChange("application", consumerConfig.getInterfaceId(), configValue, null, PropertyChangeType.DELETED); + changes = new HashMap<>(); + changes.put(configChange.getPropertyName(), configChange); + event = new ConfigChangeEvent("application", changes); + apolloConfigListener.onChange(event); + Assert.assertEquals(-1, consumerConfig.getMethodTimeout("sayHello")); + + System.clearProperty(DynamicConfigKeys.CONFIG_CENTER_ADDRESS.getKey()); + } +} diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/NacosDynamicConfigTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/NacosDynamicConfigTest.java new file mode 100644 index 000000000..4a2b281fc --- /dev/null +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/NacosDynamicConfigTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.test.config; + +import com.alipay.sofa.rpc.config.ApplicationConfig; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.dynamic.DynamicConfigKeys; +import com.alipay.sofa.rpc.dynamic.DynamicConfigManager; + +import com.alipay.sofa.rpc.dynamic.DynamicConfigManagerFactory; +import com.alipay.sofa.rpc.dynamic.nacos.NacosDynamicConfigManager; +import com.alipay.sofa.rpc.test.HelloService; +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.Map; + +/** + * @author Narziss + * @version NacosDynamicConfigTest.java, v 0.1 2024年09月28日 12:11 Narziss + */ +public class NacosDynamicConfigTest { + + @Test + public void testNacosDynamicConfig() throws Exception { + System.setProperty(DynamicConfigKeys.DYNAMIC_REFRESH_ENABLE.getKey(), "true"); + System.setProperty(DynamicConfigKeys.CONFIG_CENTER_ADDRESS.getKey(), + "nacos://127.0.0.1:8848/sofa-rpc-config?username=nacos&password=nacos"); + ApplicationConfig clientApplication = new ApplicationConfig(); + clientApplication.setAppName("demo"); + + ConsumerConfig consumerConfig = new ConsumerConfig() + .setInterfaceId(HelloService.class.getName()) + .setProtocol("bolt") + .setDirectUrl("bolt://127.0.0.1:12200") + .setConnectTimeout(10 * 1000) + .setApplication(clientApplication); + + consumerConfig.refer(); + + // 获取接口对应的动态配置监听器 + DynamicConfigManager dynamicConfigManager = DynamicConfigManagerFactory.getDynamicManager + (clientApplication.getAppName(), "nacos"); + Field field = NacosDynamicConfigManager.class.getDeclaredField("watchListenerMap"); + field.setAccessible(true); + Map watchListenerMap = (Map) field + .get(dynamicConfigManager); + NacosDynamicConfigManager.NacosConfigListener nacosConfigListener = watchListenerMap.get(consumerConfig + .getInterfaceId()); + + // 测试配置新增 + String configValue = "timeout=5000"; + nacosConfigListener.innerReceive(consumerConfig.getInterfaceId(), consumerConfig.getAppName(), configValue); + Assert.assertEquals(5000, consumerConfig.getMethodTimeout("sayHello")); + // 测试配置修改 + configValue = "timeout=5000" + System.lineSeparator() + ".sayHello.timeout=6000"; + nacosConfigListener.innerReceive(consumerConfig.getInterfaceId(), consumerConfig.getAppName(), configValue); + Assert.assertEquals(6000, consumerConfig.getMethodTimeout("sayHello")); + // 测试配置删除 + configValue = ""; + nacosConfigListener.innerReceive(consumerConfig.getInterfaceId(), consumerConfig.getAppName(), configValue); + Assert.assertEquals(-1, consumerConfig.getMethodTimeout("sayHello")); + + System.clearProperty(DynamicConfigKeys.CONFIG_CENTER_ADDRESS.getKey()); + } +} diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/ZookeeperDynamicConfigTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/ZookeeperDynamicConfigTest.java new file mode 100644 index 000000000..a67916e07 --- /dev/null +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/ZookeeperDynamicConfigTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.test.config; + +import com.alipay.sofa.rpc.config.ApplicationConfig; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.dynamic.DynamicConfigKeys; +import com.alipay.sofa.rpc.dynamic.DynamicConfigManager; +import com.alipay.sofa.rpc.dynamic.DynamicConfigManagerFactory; +import com.alipay.sofa.rpc.dynamic.zk.ZookeeperDynamicConfigManager; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; +import com.alipay.sofa.rpc.test.HelloService; +import com.alipay.sofa.rpc.test.config.base.BaseZkTest; +import org.apache.curator.framework.CuratorFramework; +import org.apache.zookeeper.CreateMode; +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Field; + +/** + * @author Narziss + * @version ZookeeperDynamicConfigTest.java, v 0.1 2024年09月28日 14:33 Narziss + */ +public class ZookeeperDynamicConfigTest extends BaseZkTest { + + Logger logger = LoggerFactory.getLogger(ZookeeperDynamicConfigTest.class); + + @Test + public void testZookeeperDynamicConfig() throws Exception { + System.setProperty(DynamicConfigKeys.DYNAMIC_REFRESH_ENABLE.getKey(), "true"); + System.setProperty(DynamicConfigKeys.CONFIG_CENTER_ADDRESS.getKey(), "zookeeper://127.0.0.1:2181"); + ApplicationConfig clientApplication = new ApplicationConfig(); + clientApplication.setAppName("demo"); + + ConsumerConfig consumerConfig = new ConsumerConfig() + .setInterfaceId(HelloService.class.getName()) + .setProtocol("bolt") + .setDirectUrl("bolt://127.0.0.1:12200") + .setConnectTimeout(10 * 1000) + .setApplication(clientApplication); + + consumerConfig.refer(); + + DynamicConfigManager dynamicConfigManager = DynamicConfigManagerFactory.getDynamicManager + (clientApplication.getAppName(), "zookeeper"); + Field field = ZookeeperDynamicConfigManager.class.getDeclaredField("zkClient"); + field.setAccessible(true); + CuratorFramework zkClient = (CuratorFramework) field.get(dynamicConfigManager); + + // 新增或修改配置节点 + if (zkClient.checkExists().forPath("/config/demo/com.alipay.sofa.rpc.test.HelloService") == null) { + zkClient.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath("/config/demo/com.alipay.sofa.rpc.test.HelloService", "timeout=5000".getBytes()); + } else { + zkClient.setData().forPath("/config/demo/com.alipay.sofa.rpc.test.HelloService", + "timeout=5000".getBytes()); + } + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + // 验证配置是否更新 + Assert.assertEquals(5000, consumerConfig.getMethodTimeout("sayHello")); + + //删除配置节点 + zkClient.delete().forPath("/config/demo/com.alipay.sofa.rpc.test.HelloService"); + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + // 验证配置是否删除 + Assert.assertEquals(-1, consumerConfig.getMethodTimeout("sayHello")); + + System.clearProperty(DynamicConfigKeys.CONFIG_CENTER_ADDRESS.getKey()); + + } +} diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/base/BaseZkTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/base/BaseZkTest.java new file mode 100644 index 000000000..9268a8f0b --- /dev/null +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/base/BaseZkTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.test.config.base; + +import com.alipay.sofa.rpc.context.RpcInternalContext; +import com.alipay.sofa.rpc.context.RpcInvokeContext; +import com.alipay.sofa.rpc.context.RpcRunningState; +import com.alipay.sofa.rpc.context.RpcRuntimeContext; +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.IOException; + +/** + * @author Narziss + * @version BaseZkTest.java, v 0.1 2024年10月08日 10:20 Narziss + */ +public abstract class BaseZkTest { + protected static TestingServer server = null; + + @BeforeClass + public static void adBeforeClass() { + RpcRunningState.setUnitTestMode(true); + + try { + server = new TestingServer(2181, true); + server.start(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @AfterClass + public static void adAfterClass() { + RpcRuntimeContext.destroy(); + RpcInternalContext.removeContext(); + RpcInvokeContext.removeContext(); + + if (server != null) { + try { + server.stop(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } +} \ No newline at end of file