diff --git a/build/pom.xml b/build/pom.xml
index 2f072af7de1..71153b65b9d 100644
--- a/build/pom.xml
+++ b/build/pom.xml
@@ -88,6 +88,7 @@
0.2.0-RC2
3.6.1
2.0
+ 1.9.13
5.8.2
diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index e83eaa555b8..838c1568e3e 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -5,7 +5,7 @@ Add changes here for all PR submitted to the 2.x branch.
### feature:
- [[#6226](https://github.com/apache/incubator-seata/pull/6226)] multi-version seata protocol support
- [[#6537](https://github.com/apache/incubator-seata/pull/6537)] support Namingserver
-
+- [[#6538](https://github.com/apache/incubator-seata/pull/6538)] Integration of naming server on the Seata server side
### bugfix:
- [[#6592](https://github.com/apache/incubator-seata/pull/6592)] fix @Async annotation not working in ClusterWatcherManager
- [[#6624](https://github.com/apache/incubator-seata/pull/6624)] fix Alibaba Dubbo convert error
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index 5a651a2acd9..63f9a01fd95 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -5,6 +5,7 @@
### feature:
- [[#6226](https://github.com/apache/incubator-seata/pull/6226)] 支持seata私有协议多版本兼容
- [[#6537](https://github.com/apache/incubator-seata/pull/6537)] 支持 Namingserver
+- [[#6538](https://github.com/apache/incubator-seata/pull/6538)] seata server端集成naming server
### bugfix:
- [[#6592](https://github.com/apache/incubator-seata/pull/6592)] fix @Async注解ClusterWatcherManager中不生效的问题
@@ -49,6 +50,7 @@
+
非常感谢以下 contributors 的代码贡献。若有无意遗漏,请报告。
diff --git a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java
index e1c0f11f395..f88a3cd2bfa 100644
--- a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java
+++ b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java
@@ -811,6 +811,11 @@ public interface ConfigurationKeys {
*/
String SERVER_ENABLE_CHECK_AUTH = SERVER_PREFIX + "enableCheckAuth";
+ /**
+ * The constant NAMING_SERVER
+ */
+ String NAMING_SERVER = "namingserver";
+
/**
* The constant APPLICATION_ID.
*/
@@ -1011,4 +1016,39 @@ public interface ConfigurationKeys {
* The constant ROCKET_MQ_MSG_TIMEOUT
*/
String ROCKET_MQ_MSG_TIMEOUT = SERVER_PREFIX + "rocketmqMsgTimeout";
+
+ /**
+ *
+ */
+ String NAMINGSERVER_REGISTRY_PREFIX = FILE_ROOT_REGISTRY + FILE_CONFIG_SPLIT_CHAR + NAMING_SERVER + FILE_CONFIG_SPLIT_CHAR;
+
+ /**
+ *
+ */
+ String SEATA_NAMINGSERVER_REGISTRY_PREFIX = SEATA_FILE_ROOT_CONFIG + FILE_CONFIG_SPLIT_CHAR + NAMINGSERVER_REGISTRY_PREFIX;
+
+ /**
+ * The constant REGISTRY_NAMINGSERVER_CLUSTER
+ */
+ String REGISTRY_NAMINGSERVER_CLUSTER = NAMINGSERVER_REGISTRY_PREFIX + "cluster";
+
+ /**
+ * The constant MAPPING_TABLE_NAME
+ */
+ String MAPPING_TABLE_NAME = STORE_DB_PREFIX + FILE_CONFIG_SPLIT_CHAR + "mapping-table";
+
+ /**
+ * The constant NAMESPACE_KEY
+ */
+ String NAMESPACE_KEY = SEATA_NAMINGSERVER_REGISTRY_PREFIX + "namespace";
+
+ /**
+ * The constant CLUSTER_NAME_KEY
+ */
+ String CLUSTER_NAME_KEY = SEATA_FILE_ROOT_CONFIG + FILE_CONFIG_SPLIT_CHAR + REGISTRY_NAMINGSERVER_CLUSTER;
+
+ /**
+ * The constant META_PREFIX
+ */
+ String META_PREFIX = SEATA_FILE_ROOT_CONFIG + FILE_CONFIG_SPLIT_CHAR + FILE_ROOT_REGISTRY + FILE_CONFIG_SPLIT_CHAR + "metadata.";
}
diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/constants/NamingServerConstants.java b/common/src/main/java/org/apache/seata/common/NamingServerConstants.java
similarity index 90%
rename from namingserver/src/main/java/org/apache/seata/namingserver/constants/NamingServerConstants.java
rename to common/src/main/java/org/apache/seata/common/NamingServerConstants.java
index 91c11a36230..975a42d3ee0 100644
--- a/namingserver/src/main/java/org/apache/seata/namingserver/constants/NamingServerConstants.java
+++ b/common/src/main/java/org/apache/seata/common/NamingServerConstants.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.seata.namingserver.constants;
+package org.apache.seata.common;
public interface NamingServerConstants {
/**
@@ -46,4 +46,10 @@ public interface NamingServerConstants {
* The constant IP_PORT_SPLIT_CHAR
*/
String IP_PORT_SPLIT_CHAR = ":";
+
+ /**
+ * The constant DEFAULT_VGROUP_MAPPING
+ */
+ String DEFAULT_VGROUP_MAPPING = "vgroup_table";
+
}
diff --git a/common/src/main/java/org/apache/seata/common/exception/ErrorCode.java b/common/src/main/java/org/apache/seata/common/exception/ErrorCode.java
index fb69cd328f9..8406d7db0c6 100644
--- a/common/src/main/java/org/apache/seata/common/exception/ErrorCode.java
+++ b/common/src/main/java/org/apache/seata/common/exception/ErrorCode.java
@@ -29,11 +29,18 @@ public enum ErrorCode {
/**
* 0100 ~ 0199 Security related errors
*/
- ERR_DESERIALIZATION_SECURITY(ErrorType.Security, 0156);
+ ERR_DESERIALIZATION_SECURITY(ErrorType.Security, 0156),
/**
* The error code of the transaction exception.
*/
+
+
+ /**
+ * The error code of the sql exception
+ */
+ ERROR_SQL(ErrorType.Datasource, 0201);
+
private int code;
private ErrorType type;
@@ -82,10 +89,6 @@ enum ErrorType {
* Network error type.
*/
Network,
- /**
- * Security related error type.
- */
- Security,
/**
* Tm error type.
*/
@@ -102,6 +105,10 @@ enum ErrorType {
* Datasource error type.
*/
Datasource,
+ /**
+ * Security error type.
+ */
+ Security,
/**
* Other error type.
*/
diff --git a/common/src/main/java/org/apache/seata/common/metadata/Cluster.java b/common/src/main/java/org/apache/seata/common/metadata/Cluster.java
index 2ba769f58d8..2dcfec0fd64 100644
--- a/common/src/main/java/org/apache/seata/common/metadata/Cluster.java
+++ b/common/src/main/java/org/apache/seata/common/metadata/Cluster.java
@@ -26,6 +26,7 @@ public class Cluster {
private String clusterType;
private List unitData = new ArrayList<>();
+
public Cluster() {
}
diff --git a/core/src/main/java/org/apache/seata/core/store/MappingDO.java b/core/src/main/java/org/apache/seata/core/store/MappingDO.java
new file mode 100644
index 00000000000..57e0c42e458
--- /dev/null
+++ b/core/src/main/java/org/apache/seata/core/store/MappingDO.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.store;
+
+public class MappingDO {
+ private String namespace;
+
+ private String cluster;
+
+ private String unit;
+
+ private String vGroup;
+
+
+ public String getNamespace() {
+ return namespace;
+ }
+
+ public void setNamespace(String namespace) {
+ this.namespace = namespace;
+ }
+
+ public String getCluster() {
+ return cluster;
+ }
+
+ public void setCluster(String cluster) {
+ this.cluster = cluster;
+ }
+
+ public String getUnit() {
+ return unit;
+ }
+
+ public void setUnit(String unit) {
+ this.unit = unit;
+ }
+
+ public String getVGroup() {
+ return vGroup;
+ }
+
+ public void setVGroup(String vGroup) {
+ this.vGroup = vGroup;
+ }
+
+}
diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java b/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java
index 9d0cb9b8324..07074b6db19 100644
--- a/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java
+++ b/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java
@@ -26,7 +26,7 @@
import org.apache.seata.common.metadata.namingserver.Unit;
import org.apache.seata.common.result.Result;
import org.apache.seata.common.util.HttpClientUtil;
-import org.apache.seata.namingserver.constants.NamingServerConstants;
+import org.apache.seata.common.NamingServerConstants;
import org.apache.seata.namingserver.listener.ClusterChangeEvent;
import org.apache.seata.namingserver.entity.pojo.ClusterData;
import org.apache.seata.namingserver.entity.vo.monitor.ClusterVO;
@@ -55,7 +55,7 @@
import javax.annotation.PostConstruct;
-import static org.apache.seata.namingserver.constants.NamingServerConstants.CONSTANT_GROUP;
+import static org.apache.seata.common.NamingServerConstants.CONSTANT_GROUP;
@Component
public class NamingManager {
diff --git a/namingserver/src/test/java/org/apache/seata/namingserver/NamingControllerTest.java b/namingserver/src/test/java/org/apache/seata/namingserver/NamingControllerTest.java
index c9bf44edbf4..6ea5b9552ed 100644
--- a/namingserver/src/test/java/org/apache/seata/namingserver/NamingControllerTest.java
+++ b/namingserver/src/test/java/org/apache/seata/namingserver/NamingControllerTest.java
@@ -35,7 +35,7 @@
import java.util.UUID;
-import static org.apache.seata.namingserver.constants.NamingServerConstants.CONSTANT_GROUP;
+import static org.apache.seata.common.NamingServerConstants.CONSTANT_GROUP;
import static org.junit.jupiter.api.Assertions.*;
diff --git a/script/server/db/dm.sql b/script/server/db/dm.sql
index fd9d067abd0..ec3d1a27b87 100644
--- a/script/server/db/dm.sql
+++ b/script/server/db/dm.sql
@@ -91,3 +91,12 @@ INSERT INTO "SEATA"."DISTRIBUTED_LOCK" ("LOCK_KEY", "LOCK_VALUE", "EXPIRE") VALU
INSERT INTO "SEATA"."DISTRIBUTED_LOCK" ("LOCK_KEY", "LOCK_VALUE", "EXPIRE") VALUES ('RetryCommitting', ' ', 0);
INSERT INTO "SEATA"."DISTRIBUTED_LOCK" ("LOCK_KEY", "LOCK_VALUE", "EXPIRE") VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO "SEATA"."DISTRIBUTED_LOCK" ("LOCK_KEY", "LOCK_VALUE", "EXPIRE") VALUES ('TxTimeoutCheck', ' ', 0);
+
+
+CREATE TABLE "SEATA"."VGROUP_TABLE"
+(
+ `VGROUP` VARCHAR2(255),
+ `NAMESPACE` VARCHAR2(255),
+ `CLUSTER` VARCHAR2(255),
+ PRIMARY KEY (`VGROUP`)
+);
\ No newline at end of file
diff --git a/script/server/db/mysql.sql b/script/server/db/mysql.sql
index bc2e3926f81..fe0811dded2 100644
--- a/script/server/db/mysql.sql
+++ b/script/server/db/mysql.sql
@@ -87,4 +87,14 @@ CREATE TABLE IF NOT EXISTS `distributed_lock`
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
-INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);
\ No newline at end of file
+INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);
+
+
+CREATE TABLE IF NOT EXISTS `vgroup_table`
+(
+ `vGroup` VARCHAR(255),
+ `namespace` VARCHAR(255),
+ `cluster` VARCHAR(255),
+ primary key (`vGroup`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8mb4;
\ No newline at end of file
diff --git a/script/server/db/oracle.sql b/script/server/db/oracle.sql
index 143a7f22f70..797603b7976 100644
--- a/script/server/db/oracle.sql
+++ b/script/server/db/oracle.sql
@@ -87,3 +87,10 @@ INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('AsyncCommit
INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);
+
+CREATE TABLE vgroup_table
+(
+ vGroup VARCHAR2(255) PRIMARY KEY,
+ namespace VARCHAR2(255),
+ cluster VARCHAR2(255)
+);
\ No newline at end of file
diff --git a/script/server/db/postgresql.sql b/script/server/db/postgresql.sql
index e29e5908bc5..91fd2128a07 100644
--- a/script/server/db/postgresql.sql
+++ b/script/server/db/postgresql.sql
@@ -87,3 +87,11 @@ INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('AsyncCommit
INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);
+
+CREATE TABLE IF NOT EXISTS vgroup_table
+(
+ vGroup VARCHAR(255),
+ namespace VARCHAR(255),
+ cluster VARCHAR(255),
+ PRIMARY KEY (vGroup)
+);
\ No newline at end of file
diff --git a/script/server/db/sqlserver.sql b/script/server/db/sqlserver.sql
index 729cc1178ea..d39464f5549 100644
--- a/script/server/db/sqlserver.sql
+++ b/script/server/db/sqlserver.sql
@@ -117,3 +117,12 @@ INSERT INTO [distributed_lock] (lock_key, lock_value, expire) VALUES ('RetryComm
INSERT INTO [distributed_lock] (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO [distributed_lock] (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);
INSERT INTO [distributed_lock] (lock_key, lock_value, expire) VALUES ('UndologDelete', ' ', 0);
+
+CREATE TABLE [vgroup_table]
+(
+ [vGroup] nvarchar(255) NOT NULL,
+ [namespace] nvarchar(255) NOT NULL,
+ [cluster] nvarchar(255) NOT NULL,
+ PRIMARY KEY CLUSTERED ([vGroup])
+ WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON)
+)
\ No newline at end of file
diff --git a/server/pom.xml b/server/pom.xml
index ca1ad4a50e5..341c90cc2a5 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -282,6 +282,12 @@
org.apache.tomcat.embed
tomcat-embed-core
+
+
+ org.codehaus.jackson
+ jackson-mapper-asl
+ ${jackson-mapper.version}
+
diff --git a/server/src/main/java/org/apache/seata/server/Server.java b/server/src/main/java/org/apache/seata/server/Server.java
index 1ebd10a5b82..40616914c51 100644
--- a/server/src/main/java/org/apache/seata/server/Server.java
+++ b/server/src/main/java/org/apache/seata/server/Server.java
@@ -16,11 +16,16 @@
*/
package org.apache.seata.server;
+import java.util.Objects;
+import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+
import org.apache.seata.common.XID;
import org.apache.seata.common.holder.ObjectHolder;
+import org.apache.seata.common.metadata.Node;
+import org.apache.seata.common.metadata.namingserver.Instance;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.common.util.StringUtils;
@@ -31,20 +36,75 @@
import org.apache.seata.server.lock.LockerManagerFactory;
import org.apache.seata.server.metrics.MetricsManager;
import org.apache.seata.server.session.SessionHolder;
+import org.apache.seata.server.store.StoreConfig;
+import org.apache.seata.server.store.VGroupMappingStoreManager;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationListener;
+import org.springframework.core.env.ConfigurableEnvironment;
+import org.springframework.core.env.EnumerablePropertySource;
+import org.springframework.core.env.PropertySource;
import org.springframework.web.context.support.GenericWebApplicationContext;
+import static org.apache.seata.common.ConfigurationKeys.CLUSTER_NAME_KEY;
+import static org.apache.seata.common.ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR;
+import static org.apache.seata.common.ConfigurationKeys.FILE_ROOT_REGISTRY;
+import static org.apache.seata.common.ConfigurationKeys.FILE_ROOT_TYPE;
+import static org.apache.seata.common.ConfigurationKeys.META_PREFIX;
+import static org.apache.seata.common.ConfigurationKeys.NAMESPACE_KEY;
+import static org.apache.seata.common.ConfigurationKeys.NAMING_SERVER;
import static org.apache.seata.common.Constants.OBJECT_KEY_SPRING_APPLICATION_CONTEXT;
+import static org.apache.seata.common.Constants.OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGEX_SPLIT_CHAR;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGISTRY_PREFERED_NETWORKS;
/**
* The type Server.
- *
*/
public class Server {
+
+ public static void metadataInit() {
+
+ ConfigurableEnvironment environment = (ConfigurableEnvironment) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT);
+
+ // load node properties
+ Instance instance = Instance.getInstance();
+ // load namespace
+ String namespace = environment.getProperty(NAMESPACE_KEY, "public");
+ instance.setNamespace(namespace);
+ // load cluster name
+ String clusterName = environment.getProperty(CLUSTER_NAME_KEY, "default");
+ instance.setClusterName(clusterName);
+
+ // load cluster type
+ String clusterType = String.valueOf(StoreConfig.getSessionMode());
+ instance.addMetadata("cluster-type", "raft".equals(clusterType) ? clusterType : "default");
+
+ // load unit name
+ instance.setUnit(String.valueOf(UUID.randomUUID()));
+
+ // load node Endpoint
+ instance.setControlEndpoint(new Node.Endpoint(NetUtil.getLocalIp(), Integer.parseInt(Objects.requireNonNull(environment.getProperty("server.port"))), "http"));
+
+ // load metadata
+ for (PropertySource> propertySource : environment.getPropertySources()) {
+ if (propertySource instanceof EnumerablePropertySource) {
+ EnumerablePropertySource> enumerablePropertySource = (EnumerablePropertySource>) propertySource;
+ for (String propertyName : enumerablePropertySource.getPropertyNames()) {
+ if (propertyName.startsWith(META_PREFIX)) {
+ instance.addMetadata(propertyName.substring(META_PREFIX.length()), enumerablePropertySource.getProperty(propertyName));
+ }
+ }
+ }
+ }
+
+ // load vgroup mapping relationship
+ VGroupMappingStoreManager vGroupMappingStoreManager = SessionHolder.getRootVGroupMappingManager();
+ instance.addMetadata("vGroup", vGroupMappingStoreManager.loadVGroups());
+ vGroupMappingStoreManager.notifyMapping();
+ }
+
+
/**
* The entry point of application.
*
@@ -60,9 +120,9 @@ public static void start(String[] args) {
MetricsManager.get().init();
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
- NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
- new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
+ NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
+ new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
//127.0.0.1 and 0.0.0.0 are not valid here.
if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
@@ -79,14 +139,14 @@ public static void start(String[] args) {
XID.setPort(nettyRemotingServer.getListenPort());
UUIDGenerator.init(parameterParser.getServerNode());
ConfigurableListableBeanFactory beanFactory =
- ((GenericWebApplicationContext)ObjectHolder.INSTANCE
+ ((GenericWebApplicationContext) ObjectHolder.INSTANCE
.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT)).getBeanFactory();
DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
if (coordinator instanceof ApplicationListener) {
beanFactory.registerSingleton(NettyRemotingServer.class.getName(), nettyRemotingServer);
beanFactory.registerSingleton(DefaultCoordinator.class.getName(), coordinator);
- ((GenericWebApplicationContext)ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT))
- .addApplicationListener((ApplicationListener>)coordinator);
+ ((GenericWebApplicationContext) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT))
+ .addApplicationListener((ApplicationListener>) coordinator);
}
//log store mode : file, db, redis
SessionHolder.init();
@@ -96,7 +156,10 @@ public static void start(String[] args) {
// let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028
ServerRunner.addDisposable(coordinator);
-
+ if (StringUtils.equals(ConfigurationFactory.getInstance().getConfig(FILE_ROOT_REGISTRY
+ + FILE_CONFIG_SPLIT_CHAR + FILE_ROOT_TYPE), NAMING_SERVER)) {
+ metadataInit();
+ }
nettyRemotingServer.init();
}
}
diff --git a/server/src/main/java/org/apache/seata/server/controller/NamingController.java b/server/src/main/java/org/apache/seata/server/controller/NamingController.java
new file mode 100644
index 00000000000..cc521ca9f34
--- /dev/null
+++ b/server/src/main/java/org/apache/seata/server/controller/NamingController.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.server.controller;
+
+import org.apache.seata.common.metadata.namingserver.Instance;
+import org.apache.seata.common.result.Result;
+import org.apache.seata.common.util.StringUtils;
+import org.apache.seata.config.Configuration;
+import org.apache.seata.config.ConfigurationFactory;
+import org.apache.seata.core.store.MappingDO;
+import org.apache.seata.server.session.SessionHolder;
+import org.apache.seata.server.store.VGroupMappingStoreManager;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.PostConstruct;
+
+import static org.apache.seata.common.ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR;
+import static org.apache.seata.common.ConfigurationKeys.FILE_ROOT_REGISTRY;
+import static org.apache.seata.common.ConfigurationKeys.FILE_ROOT_TYPE;
+import static org.apache.seata.common.ConfigurationKeys.NAMING_SERVER;
+
+@RestController
+@RequestMapping("/naming/v1")
+public class NamingController {
+
+ private VGroupMappingStoreManager vGroupMappingStoreManager;
+
+ protected static final Configuration CONFIG = ConfigurationFactory.getInstance();
+
+ @PostConstruct
+ private void init() {
+ if (StringUtils.equals(ConfigurationFactory.getInstance().getConfig(FILE_ROOT_REGISTRY
+ + FILE_CONFIG_SPLIT_CHAR + FILE_ROOT_TYPE), NAMING_SERVER)) {
+ vGroupMappingStoreManager = SessionHolder.getRootVGroupMappingManager();
+ }
+ }
+
+ /**
+ * add vGroup in cluster
+ *
+ * @param vGroup
+ * @return
+ */
+ @GetMapping("/addVGroup")
+ public Result> addVGroup(@RequestParam String vGroup, @RequestParam String unit) {
+ Result> result = new Result<>();
+ MappingDO mappingDO = new MappingDO();
+ mappingDO.setNamespace(Instance.getInstance().getNamespace());
+ mappingDO.setCluster(Instance.getInstance().getClusterName());
+ mappingDO.setUnit(unit);
+ mappingDO.setVGroup(vGroup);
+ boolean rst = vGroupMappingStoreManager.addVGroup(mappingDO);
+ if (!rst) {
+ result.setCode("500");
+ result.setMessage("add vGroup failed!");
+ }
+ // push the newest mapping relationship
+ vGroupMappingStoreManager.notifyMapping();
+ return result;
+ }
+
+ /**
+ * remove vGroup in cluster
+ *
+ * @param vGroup
+ * @return
+ */
+ @GetMapping("/removeVGroup")
+ public Result> removeVGroup(@RequestParam String vGroup) {
+ Result> result = new Result<>();
+ boolean rst = vGroupMappingStoreManager.removeVGroup(vGroup);
+ if (!rst) {
+ result.setCode("500");
+ result.setMessage("remove vGroup failed!");
+ }
+ // push the newest mapping relationship
+ vGroupMappingStoreManager.notifyMapping();
+ return result;
+ }
+
+
+}
diff --git a/server/src/main/java/org/apache/seata/server/session/SessionHolder.java b/server/src/main/java/org/apache/seata/server/session/SessionHolder.java
index f32839e78ec..381be450f9f 100644
--- a/server/src/main/java/org/apache/seata/server/session/SessionHolder.java
+++ b/server/src/main/java/org/apache/seata/server/session/SessionHolder.java
@@ -43,6 +43,7 @@
import org.apache.seata.server.lock.distributed.DistributedLockerFactory;
import org.apache.seata.server.store.StoreConfig;
import org.apache.seata.server.store.StoreConfig.SessionMode;
+import org.apache.seata.server.store.VGroupMappingStoreManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,7 +55,6 @@
/**
* The type Session holder.
- *
*/
public class SessionHolder {
@@ -75,6 +75,13 @@ public class SessionHolder {
*/
private static long DISTRIBUTED_LOCK_EXPIRE_TIME = CONFIG.getLong(ConfigurationKeys.DISTRIBUTED_LOCK_EXPIRE_TIME, DEFAULT_DISTRIBUTED_LOCK_EXPIRE_TIME);
+ /**
+ * The default vgroup mapping store dir
+ */
+ public static final String DEFAULT_VGROUP_MAPPING_STORE_FILE_DIR = System.getProperty("user.dir");
+
+ private static VGroupMappingStoreManager ROOT_VGROUP_MAPPING_MANAGER;
+
private static SessionManager ROOT_SESSION_MANAGER;
private static volatile Map SESSION_MANAGER_MAP;
@@ -83,6 +90,7 @@ public class SessionHolder {
public static void init() {
init(null);
}
+
/**
* Init.
*
@@ -98,6 +106,8 @@ public static void init(SessionMode sessionMode) {
if (SessionMode.DB.equals(sessionMode)) {
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, SessionMode.DB.getName());
reload(sessionMode);
+
+ ROOT_VGROUP_MAPPING_MANAGER = EnhancedServiceLoader.load(VGroupMappingStoreManager.class, SessionMode.DB.getName());
} else if (SessionMode.RAFT.equals(sessionMode) || SessionMode.FILE.equals(sessionMode)) {
RaftServerManager.init();
if (CollectionUtils.isNotEmpty(RaftServerManager.getRaftServers())) {
@@ -106,23 +116,31 @@ public static void init(SessionMode sessionMode) {
if (SessionMode.RAFT.equals(sessionMode)) {
String group = CONFIG.getConfig(ConfigurationKeys.SERVER_RAFT_GROUP, DEFAULT_SEATA_GROUP);
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, SessionMode.RAFT.getName(),
- new Object[] {ROOT_SESSION_MANAGER_NAME});
+ new Object[]{ROOT_SESSION_MANAGER_NAME});
SESSION_MANAGER_MAP = new HashMap<>();
SESSION_MANAGER_MAP.put(group, ROOT_SESSION_MANAGER);
RaftServerManager.start();
} else {
+ String vGroupMappingStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR,
+ DEFAULT_VGROUP_MAPPING_STORE_FILE_DIR);
String sessionStorePath =
CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR, DEFAULT_SESSION_STORE_FILE_DIR) + separator
+ System.getProperty(SERVER_SERVICE_PORT_CAMEL);
- if (StringUtils.isBlank(sessionStorePath)) {
+ if (StringUtils.isBlank(sessionStorePath) || StringUtils.isBlank(vGroupMappingStorePath)) {
throw new StoreException("the {store.file.dir} is empty.");
}
+ ROOT_VGROUP_MAPPING_MANAGER = EnhancedServiceLoader.load(VGroupMappingStoreManager.class, SessionMode.FILE.getName(),
+ new Object[]{vGroupMappingStorePath});
+
+ ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, SessionMode.FILE.getName(),
+ new Object[]{ROOT_SESSION_MANAGER_NAME, sessionStorePath});
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, SessionMode.FILE.getName(),
- new Object[] {ROOT_SESSION_MANAGER_NAME, sessionStorePath});
+ new Object[]{ROOT_SESSION_MANAGER_NAME, sessionStorePath});
reload(sessionMode);
}
} else if (SessionMode.REDIS.equals(sessionMode)) {
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, SessionMode.REDIS.getName());
+ ROOT_VGROUP_MAPPING_MANAGER = EnhancedServiceLoader.load(VGroupMappingStoreManager.class, SessionMode.REDIS.getName());
reload(sessionMode);
} else {
// unknown store
@@ -137,7 +155,7 @@ public static void init(SessionMode sessionMode) {
*/
protected static void reload(SessionMode sessionMode) {
if (sessionMode == SessionMode.FILE) {
- ((Reloadable)ROOT_SESSION_MANAGER).reload();
+ ((Reloadable) ROOT_SESSION_MANAGER).reload();
reload(ROOT_SESSION_MANAGER.allSessions(), sessionMode);
} else {
reload(null, sessionMode);
@@ -231,7 +249,7 @@ public static void reload(Collection allSessions, SessionMode sto
// Redis, db and so on
CompletableFuture.runAsync(() -> {
SessionCondition searchCondition = new SessionCondition(GlobalStatus.UnKnown, GlobalStatus.Committed,
- GlobalStatus.Rollbacked, GlobalStatus.TimeoutRollbacked, GlobalStatus.Finished);
+ GlobalStatus.Rollbacked, GlobalStatus.TimeoutRollbacked, GlobalStatus.Finished);
searchCondition.setLazyLoadBranch(true);
long now = System.currentTimeMillis();
@@ -277,6 +295,16 @@ private static void lockBranchSessions(List branchSessions) {
}
+ //region get group mapping manager
+ public static VGroupMappingStoreManager getRootVGroupMappingManager() {
+ if (ROOT_VGROUP_MAPPING_MANAGER == null) {
+ init();
+ if (ROOT_VGROUP_MAPPING_MANAGER == null) {
+ throw new ShouldNeverHappenException("vGroupMappingManager is NOT init!");
+ }
+ }
+ return ROOT_VGROUP_MAPPING_MANAGER;
+ }
//endregion
@@ -328,7 +356,7 @@ public static GlobalSession findGlobalSession(String xid, boolean withBranchSess
* @return the value
*/
public static T lockAndExecute(GlobalSession globalSession, GlobalSession.LockCallable lockCallable)
- throws TransactionException {
+ throws TransactionException {
return getRootSessionManager().lockAndExecute(globalSession, lockCallable);
}
diff --git a/server/src/main/java/org/apache/seata/server/storage/db/store/DataBaseVGroupMappingStoreManager.java b/server/src/main/java/org/apache/seata/server/storage/db/store/DataBaseVGroupMappingStoreManager.java
new file mode 100644
index 00000000000..6389168daef
--- /dev/null
+++ b/server/src/main/java/org/apache/seata/server/storage/db/store/DataBaseVGroupMappingStoreManager.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.server.storage.db.store;
+
+import org.apache.seata.common.loader.EnhancedServiceLoader;
+import org.apache.seata.common.loader.LoadLevel;
+import org.apache.seata.common.metadata.namingserver.Instance;
+import org.apache.seata.config.Configuration;
+import org.apache.seata.config.ConfigurationFactory;
+import org.apache.seata.core.constants.ConfigurationKeys;
+import org.apache.seata.core.store.MappingDO;
+import org.apache.seata.core.store.db.DataSourceProvider;
+import org.apache.seata.server.store.VGroupMappingStoreManager;
+
+import javax.sql.DataSource;
+import java.util.HashMap;
+import java.util.List;
+
+@LoadLevel(name = "db")
+public class DataBaseVGroupMappingStoreManager implements VGroupMappingStoreManager {
+ protected VGroupMappingDataBaseDAO vGroupMappingDataBaseDAO;
+
+ protected static final Configuration CONFIG = ConfigurationFactory.getInstance();
+
+ public DataBaseVGroupMappingStoreManager() {
+ String datasourceType = CONFIG.getConfig(ConfigurationKeys.STORE_DB_DATASOURCE_TYPE);
+ //init dataSource
+ DataSource vGroupMappingDataSource = EnhancedServiceLoader.load(DataSourceProvider.class, datasourceType).provide();
+ vGroupMappingDataBaseDAO = new VGroupMappingDataBaseDAO(vGroupMappingDataSource);
+ }
+
+ @Override
+ public boolean addVGroup(MappingDO mappingDO) {
+ return vGroupMappingDataBaseDAO.insertMappingDO(mappingDO);
+ }
+
+ @Override
+ public boolean removeVGroup(String vGroup) {
+ return vGroupMappingDataBaseDAO.deleteMappingDOByVGroup(vGroup);
+ }
+
+ @Override
+ public HashMap loadVGroups() {
+ List mappingDOS = vGroupMappingDataBaseDAO.queryMappingDO();
+ Instance instance = Instance.getInstance();
+ HashMap mappings = new HashMap<>();
+ for (MappingDO mappingDO : mappingDOS) {
+ if (mappingDO.getCluster() != null && mappingDO.getCluster().equals(instance.getClusterName())) {
+ mappings.put(mappingDO.getVGroup(), null);
+ }
+ }
+ return mappings;
+ }
+
+
+}
diff --git a/server/src/main/java/org/apache/seata/server/storage/db/store/VGroupMappingDataBaseDAO.java b/server/src/main/java/org/apache/seata/server/storage/db/store/VGroupMappingDataBaseDAO.java
new file mode 100644
index 00000000000..81d227eb809
--- /dev/null
+++ b/server/src/main/java/org/apache/seata/server/storage/db/store/VGroupMappingDataBaseDAO.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.server.storage.db.store;
+
+import org.apache.seata.common.exception.ErrorCode;
+import org.apache.seata.common.exception.SeataRuntimeException;
+import org.apache.seata.common.metadata.namingserver.Instance;
+import org.apache.seata.common.util.IOUtil;
+import org.apache.seata.config.Configuration;
+import org.apache.seata.config.ConfigurationFactory;
+import org.apache.seata.core.store.MappingDO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.seata.common.ConfigurationKeys.MAPPING_TABLE_NAME;
+import static org.apache.seata.common.ConfigurationKeys.REGISTRY_NAMINGSERVER_CLUSTER;
+import static org.apache.seata.common.NamingServerConstants.DEFAULT_VGROUP_MAPPING;
+
+
+public class VGroupMappingDataBaseDAO {
+ private static final Logger LOGGER = LoggerFactory.getLogger(VGroupMappingDataBaseDAO.class);
+
+ protected DataSource vGroupMappingDataSource = null;
+
+ protected final String vMapping;
+
+ protected static final Configuration CONFIG = ConfigurationFactory.getInstance();
+
+ public VGroupMappingDataBaseDAO(DataSource vGroupMappingDataSource) {
+ this.vGroupMappingDataSource = vGroupMappingDataSource;
+ this.vMapping = CONFIG.getConfig(MAPPING_TABLE_NAME, DEFAULT_VGROUP_MAPPING);
+ }
+
+ public boolean insertMappingDO(MappingDO mappingDO) {
+ clearMappingDOByVGroup(mappingDO.getVGroup());
+ String sql = "INSERT INTO " + vMapping + " (vgroup,namespace, cluster) VALUES (?, ?, ?)";
+ Connection conn = null;
+ PreparedStatement ps = null;
+ try {
+ int index = 1;
+ conn = vGroupMappingDataSource.getConnection();
+ conn.setAutoCommit(true);
+ ps = conn.prepareStatement(sql);
+ ps.setString(index++, mappingDO.getVGroup());
+ ps.setString(index++, mappingDO.getNamespace());
+ ps.setString(index++, mappingDO.getCluster());
+
+ return ps.executeUpdate() > 0;
+ } catch (SQLException e) {
+ throw new SeataRuntimeException(ErrorCode.ERR_CONFIG, e);
+ } finally {
+ IOUtil.close(ps, conn);
+ }
+ }
+
+ public boolean clearMappingDOByVGroup(String vGroup) {
+ String sql = "DELETE FROM " + vMapping + " WHERE vGroup = ?";
+ Connection conn = null;
+ PreparedStatement ps = null;
+ try {
+ conn = vGroupMappingDataSource.getConnection();
+ conn.setAutoCommit(true);
+ ps = conn.prepareStatement(sql);
+ ps.setString(1, vGroup);
+ return ps.executeUpdate() > 0;
+ } catch (SQLException e) {
+ throw new SeataRuntimeException(ErrorCode.ERR_CONFIG, e);
+ } finally {
+ IOUtil.close(ps, conn);
+ }
+ }
+
+ public boolean deleteMappingDOByVGroup(String vGroup) {
+ String sql = "DELETE FROM " + vMapping + " WHERE vGroup = ? and cluster = ?";
+ Instance instance = Instance.getInstance();
+ Connection conn = null;
+ PreparedStatement ps = null;
+ try {
+ conn = vGroupMappingDataSource.getConnection();
+ conn.setAutoCommit(true);
+ ps = conn.prepareStatement(sql);
+ ps.setString(1, vGroup);
+ ps.setString(2, instance.getClusterName());
+ return ps.executeUpdate() > 0;
+ } catch (SQLException e) {
+ throw new SeataRuntimeException(ErrorCode.ERROR_SQL,e);
+ } finally {
+ IOUtil.close(ps, conn);
+ }
+ }
+
+ public List queryMappingDO() {
+ String sql = "SELECT vgroup,namespace, cluster FROM " + vMapping
+ + " WHERE cluster = ?";
+ Connection conn = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ List result = new ArrayList<>();
+
+ try {
+ conn = vGroupMappingDataSource.getConnection();
+ ps = conn.prepareStatement(sql);
+ ps.setString(1, CONFIG.getConfig(REGISTRY_NAMINGSERVER_CLUSTER));
+ rs = ps.executeQuery();
+
+ while (rs.next()) {
+ MappingDO mappingDO = new MappingDO();
+ mappingDO.setNamespace(rs.getString("namespace"));
+ mappingDO.setCluster(rs.getString("cluster"));
+ mappingDO.setVGroup(rs.getString("vGroup"));
+ result.add(mappingDO);
+ }
+ } catch (SQLException e) {
+ throw new SeataRuntimeException(ErrorCode.ERR_CONFIG, e);
+ } finally {
+ IOUtil.close(rs, ps, conn);
+ }
+
+ return result;
+ }
+
+
+}
diff --git a/server/src/main/java/org/apache/seata/server/storage/file/store/FileVGroupMappingStoreManager.java b/server/src/main/java/org/apache/seata/server/storage/file/store/FileVGroupMappingStoreManager.java
new file mode 100644
index 00000000000..4b4fd26e0dd
--- /dev/null
+++ b/server/src/main/java/org/apache/seata/server/storage/file/store/FileVGroupMappingStoreManager.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.server.storage.file.store;
+
+
+
+import org.apache.seata.common.loader.LoadLevel;
+import org.apache.seata.config.Configuration;
+import org.apache.seata.config.ConfigurationFactory;
+import org.apache.seata.core.store.MappingDO;
+import org.apache.seata.server.store.VGroupMappingStoreManager;
+import org.apache.commons.io.FileUtils;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+@LoadLevel(name = "file")
+public class FileVGroupMappingStoreManager implements VGroupMappingStoreManager {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FileVGroupMappingStoreManager.class);
+
+ public static final String ROOT_MAPPING_MANAGER_NAME = "vgroup_mapping.json";
+
+ private final ReentrantLock writeLock = new ReentrantLock();
+
+ private String storePath;
+
+ protected static final Configuration CONFIG = ConfigurationFactory.getInstance();
+
+
+ public FileVGroupMappingStoreManager() {
+ }
+
+ public FileVGroupMappingStoreManager(String mappingStoreFilePath) {
+ storePath = mappingStoreFilePath + File.separator + ROOT_MAPPING_MANAGER_NAME;
+ }
+
+ @Override
+ public boolean addVGroup(MappingDO mappingDO) {
+ HashMap vGroupMapping = loadVGroups();
+ vGroupMapping.put(mappingDO.getVGroup(), mappingDO.getUnit());
+ boolean isSaved = save(vGroupMapping);
+ if (!isSaved) {
+ LOGGER.error("add mapping relationship failed!");
+ }
+ return isSaved;
+ }
+
+ @Override
+ public boolean removeVGroup(String vGroup) {
+ HashMap vGroupMapping = loadVGroups();
+ vGroupMapping.remove(vGroup);
+ boolean isSaved = save(vGroupMapping);
+ if (!isSaved) {
+ LOGGER.error("remove mapping relationship failed!");
+ }
+ return isSaved;
+ }
+
+ @Override
+ public HashMap loadVGroups() {
+ HashMap vGroupMapping = new HashMap<>();
+ try {
+ File fileToLoad = new File(storePath);
+ if (!fileToLoad.exists()) {
+ try {
+ // create new file to record vgroup mapping relationship
+ boolean fileCreated = fileToLoad.createNewFile();
+ if (fileCreated) {
+ LOGGER.info("New vgroup file created at path: " + storePath);
+ } else {
+ LOGGER.warn("Failed to create a new vgroup file at path: " + storePath);
+ }
+ } catch (IOException e) {
+ LOGGER.error("Error while creating a new file: " + e.getMessage());
+ }
+ }
+
+ String fileContent = FileUtils.readFileToString(fileToLoad, "UTF-8");
+
+ if (!fileContent.isEmpty()) {
+ ObjectMapper objectMapper = new ObjectMapper();
+ vGroupMapping = objectMapper.readValue(fileContent, new TypeReference>() {
+ });
+ }
+
+
+ } catch (Exception e) {
+ LOGGER.error("mapping relationship load failed! " + e);
+ }
+ return vGroupMapping;
+ }
+
+
+ public boolean save(HashMap vGroupMapping) {
+ writeLock.lock();
+ try {
+ ObjectMapper objectMapper = new ObjectMapper();
+ String jsonMapping = objectMapper.writeValueAsString(vGroupMapping);
+ FileUtils.writeStringToFile(new File(storePath), jsonMapping, "UTF-8");
+ return true;
+ } catch (IOException e) {
+ LOGGER.error("mapping relationship saved failed! ", e);
+ return false;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManager.java b/server/src/main/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManager.java
new file mode 100644
index 00000000000..032b46b4b88
--- /dev/null
+++ b/server/src/main/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManager.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.server.storage.redis.store;
+
+import org.apache.seata.common.exception.RedisException;
+import org.apache.seata.common.loader.LoadLevel;
+import org.apache.seata.common.metadata.namingserver.Instance;
+import org.apache.seata.core.store.MappingDO;
+import org.apache.seata.server.storage.redis.JedisPooledFactory;
+import org.apache.seata.server.store.VGroupMappingStoreManager;
+import redis.clients.jedis.Jedis;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@LoadLevel(name = "redis")
+public class RedisVGroupMappingStoreManager implements VGroupMappingStoreManager {
+
+ private static final String REDIS_SPLIT_KEY = ":";
+
+ @Override
+ public boolean addVGroup(MappingDO mappingDO) {
+ String vGroup = mappingDO.getVGroup();
+ String namespace = mappingDO.getNamespace();
+ String clusterName = mappingDO.getCluster();
+ try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
+ jedis.hset(namespace, vGroup, clusterName);
+ return true;
+ } catch (Exception ex) {
+ throw new RedisException(ex);
+ }
+ }
+
+ @Override
+ public boolean removeVGroup(String vGroup) {
+ Instance instance = Instance.getInstance();
+ String namespace = instance.getNamespace();
+ try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
+ jedis.hdel(namespace, vGroup);
+ return true;
+ } catch (Exception ex) {
+ throw new RedisException(ex);
+ }
+ }
+
+ @Override
+ public HashMap loadVGroups() {
+ Instance instance = Instance.getInstance();
+ String namespace = instance.getNamespace();
+ String clusterName = instance.getClusterName();
+ try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
+ Map mappingKeyMap = jedis.hgetAll(namespace);
+ HashMap result = new HashMap<>();
+ for (Map.Entry entry : mappingKeyMap.entrySet()) {
+ result.put(entry.getKey(), null);
+ }
+ return result;
+ } catch (Exception ex) {
+ throw new RedisException(ex);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/seata/server/store/VGroupMappingStoreManager.java b/server/src/main/java/org/apache/seata/server/store/VGroupMappingStoreManager.java
new file mode 100644
index 00000000000..6f42f9822b6
--- /dev/null
+++ b/server/src/main/java/org/apache/seata/server/store/VGroupMappingStoreManager.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.server.store;
+
+import org.apache.seata.common.XID;
+import org.apache.seata.common.metadata.namingserver.Instance;
+import org.apache.seata.core.store.MappingDO;
+import org.apache.seata.discovery.registry.MultiRegistryFactory;
+import org.apache.seata.discovery.registry.RegistryService;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+
+public interface VGroupMappingStoreManager {
+ /**
+ * add VGroup Mapping relationship in cluster
+ *
+ * @param mappingDO the relationship between vGroup and Cluster
+ */
+ boolean addVGroup(MappingDO mappingDO);
+
+ /**
+ * remove VGroup Mapping relationship in cluster
+ *
+ * @param vGroup
+ */
+ boolean removeVGroup(String vGroup);
+
+ /**
+ * get VGroup Mapping relationship in cluster
+ *
+ * @return Key:vGroup,Value:unit
+ */
+ HashMap loadVGroups();
+
+ /**
+ * notify mapping relationship to all namingserver nodes
+ */
+ default void notifyMapping() {
+
+ Instance instance = Instance.getInstance();
+ instance.addMetadata("vGroup", this.loadVGroups());
+ try {
+ InetSocketAddress address = new InetSocketAddress(XID.getIpAddress(), XID.getPort());
+ for (RegistryService registryService : MultiRegistryFactory.getInstances()) {
+ registryService.register(address);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("vGroup mapping relationship notified failed! ", e);
+ }
+ }
+
+}
diff --git a/server/src/main/resources/META-INF/services/org.apache.seata.server.store.VGroupMappingStoreManager b/server/src/main/resources/META-INF/services/org.apache.seata.server.store.VGroupMappingStoreManager
new file mode 100644
index 00000000000..25265e50995
--- /dev/null
+++ b/server/src/main/resources/META-INF/services/org.apache.seata.server.store.VGroupMappingStoreManager
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.seata.server.storage.db.store.DataBaseVGroupMappingStoreManager
+org.apache.seata.server.storage.file.store.FileVGroupMappingStoreManager
+org.apache.seata.server.storage.redis.store.RedisVGroupMappingStoreManager
\ No newline at end of file
diff --git a/server/src/main/resources/application.example.yml b/server/src/main/resources/application.example.yml
index fb4cfad34a6..b14414b6cd9 100644
--- a/server/src/main/resources/application.example.yml
+++ b/server/src/main/resources/application.example.yml
@@ -76,6 +76,13 @@ seata:
# support: nacos 、 eureka 、 redis 、 zk 、 consul 、 etcd3 、 sofa
type: file
preferred-networks: 30.240.*
+ metadata:
+ weight: 1
+ namingserver:
+ server-addr: 127.0.0.1:8080
+ cluster: default
+ namespace: public
+ heartbeat-period: 1000
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
diff --git a/server/src/test/java/org/apache/seata/server/controller/NamingControllerTest.java b/server/src/test/java/org/apache/seata/server/controller/NamingControllerTest.java
new file mode 100644
index 00000000000..8ea8997aae3
--- /dev/null
+++ b/server/src/test/java/org/apache/seata/server/controller/NamingControllerTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.server.controller;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+
+
+@Disabled
+@SpringBootTest
+class NamingControllerTest {
+ @Autowired
+ private NamingController namingController;
+
+ @Test
+ void addVGroup() {
+ namingController.addVGroup("group1","unit1");
+ }
+
+ @Test
+ void removeVGroup() {
+ namingController.removeVGroup("group1");
+ }
+}
\ No newline at end of file
diff --git a/server/src/test/java/org/apache/seata/server/storage/file/store/FileVGroupMappingStoreManagerTest.java b/server/src/test/java/org/apache/seata/server/storage/file/store/FileVGroupMappingStoreManagerTest.java
new file mode 100644
index 00000000000..e0343118a60
--- /dev/null
+++ b/server/src/test/java/org/apache/seata/server/storage/file/store/FileVGroupMappingStoreManagerTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.server.storage.file.store;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.seata.core.store.MappingDO;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.boot.test.context.SpringBootTest;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+@ExtendWith(MockitoExtension.class)
+@SpringBootTest
+public class FileVGroupMappingStoreManagerTest {
+
+ private FileVGroupMappingStoreManager fileVGroupMappingStoreManager;
+ private static final String STORE_PATH = "sessionStore/vgroup_mapping.json";
+ private static final String VGROUP_NAME = "testVGroup";
+ private static final String UNIT = "testUnit";
+
+ @BeforeEach
+ public void setUp() {
+ fileVGroupMappingStoreManager = new FileVGroupMappingStoreManager("sessionStore");
+ File file = new File(STORE_PATH);
+ if (file.exists()) {
+ file.delete();
+ }
+ }
+
+ @Test
+ public void testAddVGroupSuccess() {
+ MappingDO mappingDO = new MappingDO();
+ mappingDO.setVGroup(VGROUP_NAME);
+ mappingDO.setUnit(UNIT);
+
+ assertTrue(fileVGroupMappingStoreManager.addVGroup(mappingDO));
+
+ HashMap vGroups = fileVGroupMappingStoreManager.loadVGroups();
+ assertEquals(UNIT, vGroups.get(VGROUP_NAME));
+ }
+
+ @Test
+ public void testRemoveVGroupSuccess() {
+ MappingDO mappingDO = new MappingDO();
+ mappingDO.setVGroup(VGROUP_NAME);
+ mappingDO.setUnit(UNIT);
+
+ fileVGroupMappingStoreManager.addVGroup(mappingDO);
+ assertTrue(fileVGroupMappingStoreManager.removeVGroup(VGROUP_NAME));
+
+ HashMap vGroups = fileVGroupMappingStoreManager.loadVGroups();
+ assertNull(vGroups.get(VGROUP_NAME));
+ }
+
+ @Test
+ public void testLoadVGroups() throws IOException {
+ HashMap expectedMapping = new HashMap<>();
+ expectedMapping.put(VGROUP_NAME, UNIT);
+ File file = new File(STORE_PATH);
+ FileUtils.writeStringToFile(file, "{\"testVGroup\":\"testUnit\"}", StandardCharsets.UTF_8);
+
+ HashMap actualMapping = fileVGroupMappingStoreManager.loadVGroups();
+ assertEquals(expectedMapping, actualMapping);
+ }
+
+ @Test
+ public void testSave() {
+ HashMap vGroupMapping = new HashMap<>();
+ vGroupMapping.put(VGROUP_NAME, UNIT);
+
+ assertTrue(fileVGroupMappingStoreManager.save(vGroupMapping));
+
+ File file = new File(STORE_PATH);
+ assertTrue(file.exists());
+
+ try {
+ String content = FileUtils.readFileToString(file, StandardCharsets.UTF_8);
+ assertEquals("{\"testVGroup\":\"testUnit\"}", content);
+ } catch (IOException e) {
+ fail("Failed to read the file content");
+ }
+ }
+
+ @Test
+ public void testAddVGroupFailure() {
+ FileVGroupMappingStoreManager spyManager = spy(new FileVGroupMappingStoreManager( "src/test/resources"));
+ doReturn(false).when(spyManager).save(any(HashMap.class));
+ MappingDO mappingDO = new MappingDO();
+ mappingDO.setVGroup(VGROUP_NAME);
+ mappingDO.setUnit(UNIT);
+
+ assertFalse(spyManager.addVGroup(mappingDO));
+ }
+
+ @Test
+ public void testRemoveVGroupFailure() {
+ FileVGroupMappingStoreManager spyManager = spy(new FileVGroupMappingStoreManager("src/test/resources"));
+ doReturn(false).when(spyManager).save(any(HashMap.class));
+ MappingDO mappingDO = new MappingDO();
+ mappingDO.setVGroup(VGROUP_NAME);
+ mappingDO.setUnit(UNIT);
+
+ spyManager.addVGroup(mappingDO);
+ assertFalse(spyManager.removeVGroup(VGROUP_NAME));
+ }
+}
diff --git a/server/src/test/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManagerTest.java b/server/src/test/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManagerTest.java
new file mode 100644
index 00000000000..fac04d66de1
--- /dev/null
+++ b/server/src/test/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManagerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.server.storage.redis.store;
+
+import org.apache.seata.common.metadata.namingserver.Instance;
+import org.apache.seata.core.store.MappingDO;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+import java.util.Map;
+
+
+@SpringBootTest
+public class RedisVGroupMappingStoreManagerTest {
+ private RedisVGroupMappingStoreManager redisVGroupMappingStoreManager;
+
+ @BeforeEach
+ public void setUp() {
+ redisVGroupMappingStoreManager = new RedisVGroupMappingStoreManager();
+ }
+
+ @Test
+ public void testLoadVGroups() {
+ Instance instance = Instance.getInstance();
+ instance.setNamespace("public");
+ instance.setClusterName("testCluster");
+ instance.setUnit("123");
+ MappingDO mappingDO = new MappingDO();
+ mappingDO.setVGroup("testVGroup");
+ mappingDO.setCluster("testCluster");
+ mappingDO.setNamespace("public");
+ redisVGroupMappingStoreManager.addVGroup(mappingDO);
+ Map map = redisVGroupMappingStoreManager.loadVGroups();
+ Assertions.assertTrue(map.containsKey("testVGroup"));
+ redisVGroupMappingStoreManager.removeVGroup("testVGroup");
+ map = redisVGroupMappingStoreManager.loadVGroups();
+ Assertions.assertFalse(map.containsKey("testVGroup"));
+ }
+}