diff --git a/dependencies/pom.xml b/dependencies/pom.xml
index 9c11cce0f28..42622605d61 100644
--- a/dependencies/pom.xml
+++ b/dependencies/pom.xml
@@ -113,6 +113,7 @@
2.23.4
3.12.2
9.4.38.v20210224
+ 4.9.0
1.12.17
@@ -687,6 +688,12 @@
jetty-servlet
${jetty-version}
+
+ org.apache.rocketmq
+ rocketmq-client
+ ${rocketmq-version}
+ true
+
diff --git a/integration-tx-api/pom.xml b/integration-tx-api/pom.xml
index 12cc16bc2d6..5f7410f67d7 100644
--- a/integration-tx-api/pom.xml
+++ b/integration-tx-api/pom.xml
@@ -14,8 +14,8 @@
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
-
io.seata
diff --git a/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/DefaultCommonFenceHandler.java b/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/DefaultCommonFenceHandler.java
index e853b46c9bb..82d1e23b52e 100644
--- a/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/DefaultCommonFenceHandler.java
+++ b/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/DefaultCommonFenceHandler.java
@@ -28,10 +28,6 @@ public class DefaultCommonFenceHandler implements FenceHandler {
private FenceHandler fenceHandler;
- private static class SingletonHolder {
- private static final DefaultCommonFenceHandler INSTANCE = new DefaultCommonFenceHandler();
- }
-
public static DefaultCommonFenceHandler get() {
return DefaultCommonFenceHandler.SingletonHolder.INSTANCE;
}
@@ -69,4 +65,8 @@ public int deleteFenceByDate(Date datetime) {
check();
return fenceHandler.deleteFenceByDate(datetime);
}
+
+ private static class SingletonHolder {
+ private static final DefaultCommonFenceHandler INSTANCE = new DefaultCommonFenceHandler();
+ }
}
diff --git a/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/config/CommonFenceConfig.java b/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/config/CommonFenceConfig.java
index 1023fa16916..55aa771e3e7 100644
--- a/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/config/CommonFenceConfig.java
+++ b/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/config/CommonFenceConfig.java
@@ -46,24 +46,20 @@ public class CommonFenceConfig implements Disposable {
* Common fence clean period max value. maximum interval is 68 years
*/
private static final Duration MAX_PERIOD = Duration.ofSeconds(Integer.MAX_VALUE);
-
+ /**
+ * Common fence clean scheduled thread pool
+ */
+ private final ScheduledThreadPoolExecutor tccFenceClean = new ScheduledThreadPoolExecutor(1,
+ new NamedThreadFactory("tccFenceClean", 1));
/**
* Common fence clean period. only duration type format are supported
*/
private Duration cleanPeriod = Duration.ofDays(DefaultValues.DEFAULT_COMMON_FENCE_CLEAN_PERIOD);
-
/**
* Common fence log table name
*/
private String logTableName = DefaultValues.DEFAULT_COMMON_FENCE_LOG_TABLE_NAME;
-
- /**
- * Common fence clean scheduled thread pool
- */
- private final ScheduledThreadPoolExecutor tccFenceClean = new ScheduledThreadPoolExecutor(1,
- new NamedThreadFactory("tccFenceClean", 1));
-
public AtomicBoolean getInitialized() {
return initialized;
}
diff --git a/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/store/db/CommonFenceStoreDataBaseDAO.java b/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/store/db/CommonFenceStoreDataBaseDAO.java
index 589f1e0a323..3ea8148d354 100644
--- a/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/store/db/CommonFenceStoreDataBaseDAO.java
+++ b/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/store/db/CommonFenceStoreDataBaseDAO.java
@@ -43,13 +43,12 @@
*/
public class CommonFenceStoreDataBaseDAO implements CommonFenceStore {
+ private static volatile CommonFenceStoreDataBaseDAO instance = null;
/**
* Common fence log table name
*/
private String logTableName = DefaultValues.DEFAULT_COMMON_FENCE_LOG_TABLE_NAME;
- private static volatile CommonFenceStoreDataBaseDAO instance = null;
-
private CommonFenceStoreDataBaseDAO() {}
public static CommonFenceStore getInstance() {
diff --git a/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/store/db/sql/CommonFenceStoreSqls.java b/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/store/db/sql/CommonFenceStoreSqls.java
index 842202a2366..3b590151f85 100644
--- a/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/store/db/sql/CommonFenceStoreSqls.java
+++ b/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/store/db/sql/CommonFenceStoreSqls.java
@@ -24,10 +24,6 @@
*/
public class CommonFenceStoreSqls {
- private CommonFenceStoreSqls() {
- throw new IllegalStateException("Utility class");
- }
-
/**
* The constant LOCAL_TCC_LOG_PLACEHOLD.
*/
@@ -46,21 +42,27 @@ private CommonFenceStoreSqls() {
+ " (xid, branch_id, action_name, status, gmt_create, gmt_modified) "
+ " values (?, ?, ?, ?, ?, ?) ";
+ /**
+ * The constant QUERY_END_STATUS_BY_DATE.
+ */
+ protected static final String QUERY_END_STATUS_BY_DATE = "select xid, branch_id, status, gmt_create, gmt_modified "
+ + "from " + LOCAL_TCC_LOG_PLACEHOLD
+ + " where gmt_modified < ? "
+ + " and status in (" + CommonFenceConstant.STATUS_COMMITTED + " , " + CommonFenceConstant.STATUS_ROLLBACKED + " , " + CommonFenceConstant.STATUS_SUSPENDED + ")"
+ + " limit ?";
+
/**
* The constant QUERY_BY_BRANCH_ID_AND_XID.
*/
protected static final String QUERY_BY_BRANCH_ID_AND_XID = "select xid, branch_id, status, gmt_create, gmt_modified "
+ "from " + LOCAL_TCC_LOG_PLACEHOLD
+ " where xid = ? and branch_id = ? for update";
-
/**
- * The constant QUERY_END_STATUS_BY_DATE.
+ * The constant DELETE_BY_DATE_AND_STATUS.
*/
- protected static final String QUERY_END_STATUS_BY_DATE = "select xid, branch_id, status, gmt_create, gmt_modified "
- + "from " + LOCAL_TCC_LOG_PLACEHOLD
- + " where gmt_modified < ? "
- + " and status in (" + CommonFenceConstant.STATUS_COMMITTED + " , " + CommonFenceConstant.STATUS_ROLLBACKED + " , " + CommonFenceConstant.STATUS_SUSPENDED + ")"
- + " limit ?";
+ protected static final String DELETE_BY_DATE_AND_STATUS = "delete from " + LOCAL_TCC_LOG_PLACEHOLD
+ + " where gmt_modified < ? "
+ + " and status in (" + CommonFenceConstant.STATUS_COMMITTED + " , " + CommonFenceConstant.STATUS_ROLLBACKED + " , " + CommonFenceConstant.STATUS_SUSPENDED + ")";
/**
* The constant UPDATE_STATUS_BY_BRANCH_ID_AND_XID.
@@ -79,12 +81,9 @@ private CommonFenceStoreSqls() {
protected static final String DELETE_BY_BRANCH_XIDS = "delete from " + LOCAL_TCC_LOG_PLACEHOLD + " where xid in (" + PRAMETER_PLACEHOLD + ")";
- /**
- * The constant DELETE_BY_DATE_AND_STATUS.
- */
- protected static final String DELETE_BY_DATE_AND_STATUS = "delete from " + LOCAL_TCC_LOG_PLACEHOLD
- + " where gmt_modified < ? "
- + " and status in (" + CommonFenceConstant.STATUS_COMMITTED + " , " + CommonFenceConstant.STATUS_ROLLBACKED + " , " + CommonFenceConstant.STATUS_SUSPENDED + ")";
+ private CommonFenceStoreSqls() {
+ throw new IllegalStateException("Utility class");
+ }
public static String getInsertLocalTCCLogSQL(String localTccTable) {
return INSERT_LOCAL_TCC_LOG.replace(LOCAL_TCC_LOG_PLACEHOLD, localTccTable);
diff --git a/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java b/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java
index 525d4c9392b..f389077de39 100644
--- a/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java
+++ b/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java
@@ -72,43 +72,21 @@ public class GlobalTransactionalInterceptorHandler extends AbstractProxyInvocati
private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();
private final GlobalLockTemplate globalLockTemplate = new GlobalLockTemplate();
-
- private Set methodsToProxy;
+ private static int degradeCheckAllowTimes;
private volatile boolean disable;
private static final AtomicBoolean ATOMIC_DEGRADE_CHECK = new AtomicBoolean(false);
private static volatile Integer degradeNum = 0;
private static volatile Integer reachNum = 0;
- private static int degradeCheckAllowTimes;
- protected AspectTransactional aspectTransactional;
private static int degradeCheckPeriod;
-
private static int defaultGlobalTransactionTimeout = 0;
-
private final FailureHandler failureHandler;
+ protected AspectTransactional aspectTransactional;
+ private Set methodsToProxy;
private static final EventBus EVENT_BUS = new GuavaEventBus("degradeCheckEventBus", true);
private static volatile ScheduledThreadPoolExecutor executor;
- private void initDefaultGlobalTransactionTimeout() {
- if (GlobalTransactionalInterceptorHandler.defaultGlobalTransactionTimeout <= 0) {
- int defaultGlobalTransactionTimeout;
- try {
- defaultGlobalTransactionTimeout = ConfigurationFactory.getInstance().getInt(
- ConfigurationKeys.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT);
- } catch (Exception e) {
- LOGGER.error("Illegal global transaction timeout value: " + e.getMessage());
- defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;
- }
- if (defaultGlobalTransactionTimeout <= 0) {
- LOGGER.warn("Global transaction timeout value '{}' is illegal, and has been reset to the default value '{}'",
- defaultGlobalTransactionTimeout, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT);
- defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;
- }
- GlobalTransactionalInterceptorHandler.defaultGlobalTransactionTimeout = defaultGlobalTransactionTimeout;
- }
- }
-
public GlobalTransactionalInterceptorHandler(FailureHandler failureHandler, Set methodsToProxy) {
this.failureHandler = failureHandler == null ? FailureHandlerHolder.getFailureHandler() : failureHandler;
this.methodsToProxy = methodsToProxy;
@@ -134,6 +112,25 @@ public GlobalTransactionalInterceptorHandler(FailureHandler failureHandler, Set<
this.aspectTransactional = aspectTransactional;
}
+ private void initDefaultGlobalTransactionTimeout() {
+ if (GlobalTransactionalInterceptorHandler.defaultGlobalTransactionTimeout <= 0) {
+ int defaultGlobalTransactionTimeout;
+ try {
+ defaultGlobalTransactionTimeout = ConfigurationFactory.getInstance().getInt(
+ ConfigurationKeys.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT);
+ } catch (Exception e) {
+ LOGGER.error("Illegal global transaction timeout value: " + e.getMessage());
+ defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;
+ }
+ if (defaultGlobalTransactionTimeout <= 0) {
+ LOGGER.warn("Global transaction timeout value '{}' is illegal, and has been reset to the default value '{}'",
+ defaultGlobalTransactionTimeout, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT);
+ defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;
+ }
+ GlobalTransactionalInterceptorHandler.defaultGlobalTransactionTimeout = defaultGlobalTransactionTimeout;
+ }
+ }
+
@Override
public Set getMethodsToProxy() {
return methodsToProxy;
diff --git a/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/parser/DefaultInterfaceParser.java b/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/parser/DefaultInterfaceParser.java
index c47b361e97b..3200de44dae 100644
--- a/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/parser/DefaultInterfaceParser.java
+++ b/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/parser/DefaultInterfaceParser.java
@@ -31,18 +31,14 @@ public class DefaultInterfaceParser implements InterfaceParser {
protected static final List ALL_INTERFACE_PARSERS = new ArrayList<>();
- private static class SingletonHolder {
- private static final DefaultInterfaceParser INSTANCE = new DefaultInterfaceParser();
+ protected DefaultInterfaceParser() {
+ initInterfaceParser();
}
public static DefaultInterfaceParser get() {
return DefaultInterfaceParser.SingletonHolder.INSTANCE;
}
- protected DefaultInterfaceParser() {
- initInterfaceParser();
- }
-
/**
* init parsers
*/
@@ -64,4 +60,8 @@ public ProxyInvocationHandler parserInterfaceToProxy(Object target) throws Excep
return null;
}
+ private static class SingletonHolder {
+ private static final DefaultInterfaceParser INSTANCE = new DefaultInterfaceParser();
+ }
+
}
\ No newline at end of file
diff --git a/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/parser/DefaultResourceRegisterParser.java b/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/parser/DefaultResourceRegisterParser.java
index 8e9451c2982..5202243c721 100644
--- a/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/parser/DefaultResourceRegisterParser.java
+++ b/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/parser/DefaultResourceRegisterParser.java
@@ -29,22 +29,18 @@ public class DefaultResourceRegisterParser {
protected static List allRegisterResourceParsers = new ArrayList<>();
- public void registerResource(Object target) {
- for (RegisterResourceParser registerResourceParser : allRegisterResourceParsers) {
- registerResourceParser.registerResource(target);
- }
- }
-
- private static class SingletonHolder {
- private static final DefaultResourceRegisterParser INSTANCE = new DefaultResourceRegisterParser();
+ protected DefaultResourceRegisterParser() {
+ initResourceRegisterParser();
}
public static DefaultResourceRegisterParser get() {
return DefaultResourceRegisterParser.SingletonHolder.INSTANCE;
}
- protected DefaultResourceRegisterParser() {
- initResourceRegisterParser();
+ public void registerResource(Object target) {
+ for (RegisterResourceParser registerResourceParser : allRegisterResourceParsers) {
+ registerResourceParser.registerResource(target);
+ }
}
/**
@@ -57,5 +53,9 @@ protected void initResourceRegisterParser() {
}
}
+ private static class SingletonHolder {
+ private static final DefaultResourceRegisterParser INSTANCE = new DefaultResourceRegisterParser();
+ }
+
}
\ No newline at end of file
diff --git a/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/parser/DefaultTargetClassParser.java b/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/parser/DefaultTargetClassParser.java
index 502a39138c8..84bbf7b6c78 100644
--- a/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/parser/DefaultTargetClassParser.java
+++ b/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/parser/DefaultTargetClassParser.java
@@ -30,18 +30,14 @@ public class DefaultTargetClassParser implements TargetClassParser {
protected static List allTargetClassParsers = new ArrayList<>();
- private static class SingletonHolder {
- private static final DefaultTargetClassParser INSTANCE = new DefaultTargetClassParser();
+ protected DefaultTargetClassParser() {
+ initTargetClassParser();
}
public static DefaultTargetClassParser get() {
return DefaultTargetClassParser.SingletonHolder.INSTANCE;
}
- protected DefaultTargetClassParser() {
- initTargetClassParser();
- }
-
/**
* init parsers
*/
@@ -73,4 +69,8 @@ public Class>[] findInterfaces(Object target) throws Exception {
}
return target.getClass().getInterfaces();
}
+
+ private static class SingletonHolder {
+ private static final DefaultTargetClassParser INSTANCE = new DefaultTargetClassParser();
+ }
}
diff --git a/integration-tx-api/src/main/java/io/seata/integration/tx/api/json/DefaultJsonParser.java b/integration-tx-api/src/main/java/io/seata/integration/tx/api/json/DefaultJsonParser.java
index cc5a450538a..a82944215ec 100644
--- a/integration-tx-api/src/main/java/io/seata/integration/tx/api/json/DefaultJsonParser.java
+++ b/integration-tx-api/src/main/java/io/seata/integration/tx/api/json/DefaultJsonParser.java
@@ -31,10 +31,6 @@ public class DefaultJsonParser implements JsonParser {
protected static List allJsonParsers = new ArrayList<>();
- private static class SingletonHolder {
- private static final DefaultJsonParser INSTANCE = new DefaultJsonParser();
- }
-
private DefaultJsonParser() {
initJsonParser();
}
@@ -73,4 +69,8 @@ public T parseObject(String text, Class clazz) {
return null;
}
+ private static class SingletonHolder {
+ private static final DefaultJsonParser INSTANCE = new DefaultJsonParser();
+ }
+
}
diff --git a/script/client/spring/application.properties b/script/client/spring/application.properties
index 662b4de3ced..3fcdc985a56 100755
--- a/script/client/spring/application.properties
+++ b/script/client/spring/application.properties
@@ -15,6 +15,7 @@
#
seata.enabled=true
+seata.rocketmq-enabled=false
seata.scan-packages=firstPackage,secondPackage
seata.excludes-for-scanning=firstBeanNameForExclude,secondBeanNameForExclude
seata.excludes-for-auto-proxying=firstClassNameForExclude,secondClassNameForExclude
diff --git a/script/client/spring/application.yml b/script/client/spring/application.yml
index 9a62f299e30..53f11055979 100755
--- a/script/client/spring/application.yml
+++ b/script/client/spring/application.yml
@@ -1,5 +1,6 @@
seata:
enabled: true
+ rocketmq-enabled: false
application-id: applicationName
tx-service-group: default_tx_group
access-key: aliyunAccessKey
diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-client/src/main/java/io/seata/spring/boot/autoconfigure/properties/SeataProperties.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-client/src/main/java/io/seata/spring/boot/autoconfigure/properties/SeataProperties.java
index a93dd819782..fcf9cc17291 100644
--- a/seata-spring-autoconfigure/seata-spring-autoconfigure-client/src/main/java/io/seata/spring/boot/autoconfigure/properties/SeataProperties.java
+++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-client/src/main/java/io/seata/spring/boot/autoconfigure/properties/SeataProperties.java
@@ -32,6 +32,10 @@ public class SeataProperties {
* whether enable auto configuration
*/
private boolean enabled = true;
+ /**
+ * whether enable rocketmq integrate
+ */
+ private boolean rocketmqEnabled = false;
/**
* application id
*/
@@ -87,6 +91,14 @@ public SeataProperties setEnabled(boolean enabled) {
return this;
}
+ public boolean isRocketmqEnabled() {
+ return rocketmqEnabled;
+ }
+
+ public void setRocketmqEnabled(boolean rocketmqEnabled) {
+ this.rocketmqEnabled = rocketmqEnabled;
+ }
+
public String getApplicationId() {
if (applicationId == null) {
applicationId = springCloudAlibabaConfiguration.getApplicationId();
diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-client/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/seata-spring-autoconfigure/seata-spring-autoconfigure-client/src/main/resources/META-INF/additional-spring-configuration-metadata.json
index 0b20cfc4f20..191beb7c318 100644
--- a/seata-spring-autoconfigure/seata-spring-autoconfigure-client/src/main/resources/META-INF/additional-spring-configuration-metadata.json
+++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-client/src/main/resources/META-INF/additional-spring-configuration-metadata.json
@@ -13,6 +13,13 @@
"sourceType": "io.seata.spring.boot.autoconfigure.properties.SeataProperties",
"defaultValue": "AT"
},
+ {
+ "name": "seata.rocketmq-enabled",
+ "type": "java.lang.Boolean",
+ "sourceType": "io.seata.spring.boot.autoconfigure.properties.SeataProperties",
+ "description": "Enable seata integrate rocketmq",
+ "defaultValue": false
+ },
{
"name": "spring.cloud.alibaba.seata.application-id",
"type": "java.lang.String",
diff --git a/seata-spring-boot-starter/pom.xml b/seata-spring-boot-starter/pom.xml
index ec144db0d90..2f305c3cd47 100644
--- a/seata-spring-boot-starter/pom.xml
+++ b/seata-spring-boot-starter/pom.xml
@@ -63,6 +63,13 @@
spring-boot-configuration-processor
true
+
+
+ org.apache.rocketmq
+ rocketmq-client
+ true
+ provided
+
diff --git a/seata-spring-boot-starter/src/main/java/io/seata/spring/boot/autoconfigure/SeataRocketMQAutoConfiguration.java b/seata-spring-boot-starter/src/main/java/io/seata/spring/boot/autoconfigure/SeataRocketMQAutoConfiguration.java
new file mode 100644
index 00000000000..4c71521ef38
--- /dev/null
+++ b/seata-spring-boot-starter/src/main/java/io/seata/spring/boot/autoconfigure/SeataRocketMQAutoConfiguration.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 1999-2019 Seata.io Group.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.seata.spring.boot.autoconfigure;
+
+import io.seata.rm.tcc.rocketmq.RocketMQAspect;
+import io.seata.rm.tcc.rocketmq.TCCRocketMQ;
+import io.seata.rm.tcc.rocketmq.TCCRocketMQImpl;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.context.annotation.Bean;
+
+@ConditionalOnClass(name = "org.apache.rocketmq.client.producer.DefaultMQProducer")
+@ConditionalOnBean(DefaultMQProducer.class)
+@ConditionalOnExpression("${seata.enabled:true} && ${seata.rocketmq-enabled:false}")
+public class SeataRocketMQAutoConfiguration {
+
+ @Bean
+ @ConditionalOnMissingBean
+ public TCCRocketMQ tccRocketMQ() {
+ return new TCCRocketMQImpl();
+ }
+
+ @Bean
+ public RocketMQAspect rocketMQAspect(TCCRocketMQ tccRocketMQ) {
+ return new RocketMQAspect(tccRocketMQ);
+ }
+}
\ No newline at end of file
diff --git a/seata-spring-boot-starter/src/main/resources/META-INF/spring.factories b/seata-spring-boot-starter/src/main/resources/META-INF/spring.factories
index b83ade816ef..382daf032dc 100644
--- a/seata-spring-boot-starter/src/main/resources/META-INF/spring.factories
+++ b/seata-spring-boot-starter/src/main/resources/META-INF/spring.factories
@@ -3,4 +3,5 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
io.seata.spring.boot.autoconfigure.SeataAutoConfiguration,\
io.seata.spring.boot.autoconfigure.SeataDataSourceAutoConfiguration,\
io.seata.spring.boot.autoconfigure.SeataHttpAutoConfiguration,\
-io.seata.spring.boot.autoconfigure.SeataSagaAutoConfiguration
+io.seata.spring.boot.autoconfigure.SeataSagaAutoConfiguration,\
+io.seata.spring.boot.autoconfigure.SeataRocketMQAutoConfiguration
diff --git a/spring/src/main/java/io/seata/rm/fence/SpringFenceHandler.java b/spring/src/main/java/io/seata/rm/fence/SpringFenceHandler.java
index 8e6669018cb..ea39c66dc09 100644
--- a/spring/src/main/java/io/seata/rm/fence/SpringFenceHandler.java
+++ b/spring/src/main/java/io/seata/rm/fence/SpringFenceHandler.java
@@ -96,6 +96,78 @@ public static void setTransactionTemplate(TransactionTemplate transactionTemplat
SpringFenceHandler.transactionTemplate = transactionTemplate;
}
+ /**
+ * Insert Common fence log
+ *
+ * @param conn the db connection
+ * @param xid the xid
+ * @param branchId the branchId
+ * @param status the status
+ * @return the boolean
+ */
+ private static boolean insertCommonFenceLog(Connection conn, String xid, Long branchId, String actionName, Integer status) {
+ CommonFenceDO commonFenceDO = new CommonFenceDO();
+ commonFenceDO.setXid(xid);
+ commonFenceDO.setBranchId(branchId);
+ commonFenceDO.setActionName(actionName);
+ commonFenceDO.setStatus(status);
+ return COMMON_FENCE_DAO.insertCommonFenceDO(conn, commonFenceDO);
+ }
+
+ /**
+ * Update TCC Fence status and invoke target method
+ *
+ * @param method target method
+ * @param targetTCCBean target bean
+ * @param xid the global transaction id
+ * @param branchId the branch transaction id
+ * @param status the tcc fence status
+ * @return the boolean
+ */
+ private static boolean updateStatusAndInvokeTargetMethod(Connection conn, Method method, Object targetTCCBean,
+ String xid, Long branchId, int status,
+ TransactionStatus transactionStatus,
+ Object[] args) throws Exception {
+ boolean result = COMMON_FENCE_DAO.updateCommonFenceDO(conn, xid, branchId, status, CommonFenceConstant.STATUS_TRIED);
+ if (result) {
+ // invoke two phase method
+ Object ret = method.invoke(targetTCCBean, args);
+ if (null != ret) {
+ if (ret instanceof TwoPhaseResult) {
+ result = ((TwoPhaseResult) ret).isSuccess();
+ } else {
+ result = (boolean) ret;
+ }
+ // If the business execution result is false, the transaction will be rolled back
+ if (!result) {
+ transactionStatus.setRollbackOnly();
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Delete Common Fence
+ *
+ * @param xid the global transaction id
+ * @param branchId the branch transaction id
+ * @return the boolean
+ */
+ public static boolean deleteFence(String xid, Long branchId) {
+ return transactionTemplate.execute(status -> {
+ boolean ret = false;
+ try {
+ Connection conn = DataSourceUtils.getConnection(dataSource);
+ ret = COMMON_FENCE_DAO.deleteCommonFenceDO(conn, xid, branchId);
+ } catch (RuntimeException e) {
+ status.setRollbackOnly();
+ LOGGER.error("delete fence log failed, xid: {}, branchId: {}", xid, branchId, e);
+ }
+ return ret;
+ });
+ }
+
/**
* common prepare method enhanced
*
@@ -218,78 +290,6 @@ public boolean rollbackFence(Method rollbackMethod, Object targetTCCBean,
});
}
- /**
- * Insert Common fence log
- *
- * @param conn the db connection
- * @param xid the xid
- * @param branchId the branchId
- * @param status the status
- * @return the boolean
- */
- private static boolean insertCommonFenceLog(Connection conn, String xid, Long branchId, String actionName, Integer status) {
- CommonFenceDO commonFenceDO = new CommonFenceDO();
- commonFenceDO.setXid(xid);
- commonFenceDO.setBranchId(branchId);
- commonFenceDO.setActionName(actionName);
- commonFenceDO.setStatus(status);
- return COMMON_FENCE_DAO.insertCommonFenceDO(conn, commonFenceDO);
- }
-
- /**
- * Update TCC Fence status and invoke target method
- *
- * @param method target method
- * @param targetTCCBean target bean
- * @param xid the global transaction id
- * @param branchId the branch transaction id
- * @param status the tcc fence status
- * @return the boolean
- */
- private static boolean updateStatusAndInvokeTargetMethod(Connection conn, Method method, Object targetTCCBean,
- String xid, Long branchId, int status,
- TransactionStatus transactionStatus,
- Object[] args) throws Exception {
- boolean result = COMMON_FENCE_DAO.updateCommonFenceDO(conn, xid, branchId, status, CommonFenceConstant.STATUS_TRIED);
- if (result) {
- // invoke two phase method
- Object ret = method.invoke(targetTCCBean, args);
- if (null != ret) {
- if (ret instanceof TwoPhaseResult) {
- result = ((TwoPhaseResult) ret).isSuccess();
- } else {
- result = (boolean) ret;
- }
- // If the business execution result is false, the transaction will be rolled back
- if (!result) {
- transactionStatus.setRollbackOnly();
- }
- }
- }
- return result;
- }
-
- /**
- * Delete Common Fence
- *
- * @param xid the global transaction id
- * @param branchId the branch transaction id
- * @return the boolean
- */
- public static boolean deleteFence(String xid, Long branchId) {
- return transactionTemplate.execute(status -> {
- boolean ret = false;
- try {
- Connection conn = DataSourceUtils.getConnection(dataSource);
- ret = COMMON_FENCE_DAO.deleteCommonFenceDO(conn, xid, branchId);
- } catch (RuntimeException e) {
- status.setRollbackOnly();
- LOGGER.error("delete fence log failed, xid: {}, branchId: {}", xid, branchId, e);
- }
- return ret;
- });
- }
-
/**
* Delete Common Fence By Datetime
*
diff --git a/spring/src/main/java/io/seata/spring/annotation/EnableRocketMQAspect.java b/spring/src/main/java/io/seata/spring/annotation/EnableRocketMQAspect.java
new file mode 100644
index 00000000000..daf59301fda
--- /dev/null
+++ b/spring/src/main/java/io/seata/spring/annotation/EnableRocketMQAspect.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 1999-2019 Seata.io Group.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.seata.spring.annotation;
+
+import io.seata.rm.tcc.rocketmq.RocketMQAspect;
+import io.seata.rm.tcc.rocketmq.TCCRocketMQImpl;
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import org.springframework.context.annotation.Import;
+
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Import({RocketMQAspect.class, TCCRocketMQImpl.class})
+public @interface EnableRocketMQAspect {
+}
\ No newline at end of file
diff --git a/spring/src/main/java/io/seata/spring/annotation/scannercheckers/PackageScannerChecker.java b/spring/src/main/java/io/seata/spring/annotation/scannercheckers/PackageScannerChecker.java
index 30c849be5de..86cd021b3dc 100644
--- a/spring/src/main/java/io/seata/spring/annotation/scannercheckers/PackageScannerChecker.java
+++ b/spring/src/main/java/io/seata/spring/annotation/scannercheckers/PackageScannerChecker.java
@@ -58,7 +58,7 @@ public static void addScannablePackages(String... packages) {
@Override
public boolean check(Object bean, String beanName, @Nullable ConfigurableListableBeanFactory beanFactory) throws Exception {
- if (SCANNABLE_PACKAGE_SET.isEmpty()) {
+ if (SCANNABLE_PACKAGE_SET.isEmpty() || bean.getClass().getName().startsWith("io.seata")) {
// if empty, pass this checker
return true;
}
diff --git a/tcc/pom.xml b/tcc/pom.xml
index a9f9e847841..3f36abb01b7 100644
--- a/tcc/pom.xml
+++ b/tcc/pom.xml
@@ -52,6 +52,14 @@
com.alibaba
fastjson
+
+
+
+ org.apache.rocketmq
+ rocketmq-client
+ true
+ provided
+
${project.groupId}
seata-integration-tx-api
diff --git a/tcc/src/main/java/io/seata/rm/tcc/rocketmq/RocketMQAspect.java b/tcc/src/main/java/io/seata/rm/tcc/rocketmq/RocketMQAspect.java
new file mode 100644
index 00000000000..7619dfa1d05
--- /dev/null
+++ b/tcc/src/main/java/io/seata/rm/tcc/rocketmq/RocketMQAspect.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 1999-2019 Seata.io Group.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.seata.rm.tcc.rocketmq;
+
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+
+public class RocketMQAspect implements BeanPostProcessor {
+
+ public static Logger LOGGER = LoggerFactory.getLogger(RocketMQAspect.class);
+
+ private final TCCRocketMQ tccRocketMQ;
+
+ public RocketMQAspect(TCCRocketMQ tccRocketMQ) {
+ this.tccRocketMQ = tccRocketMQ;
+ }
+
+ @Override
+ public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
+ return bean;
+ }
+
+ @Override
+ public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
+ if (bean instanceof DefaultMQProducer) {
+ LOGGER.info("Generate RocketMQ Producer Proxy");
+ tccRocketMQ.setDefaultMQProducer((DefaultMQProducer) bean);
+ return new SeataMQProducer((DefaultMQProducer) bean, tccRocketMQ);
+ }
+ return bean;
+ }
+}
diff --git a/tcc/src/main/java/io/seata/rm/tcc/rocketmq/RocketMQUtils.java b/tcc/src/main/java/io/seata/rm/tcc/rocketmq/RocketMQUtils.java
new file mode 100644
index 00000000000..9103225719a
--- /dev/null
+++ b/tcc/src/main/java/io/seata/rm/tcc/rocketmq/RocketMQUtils.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 1999-2019 Seata.io Group.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.seata.rm.tcc.rocketmq;
+
+import java.net.UnknownHostException;
+import org.apache.rocketmq.client.Validators;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.LocalTransactionState;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+public class RocketMQUtils {
+
+ public static SendResult halfSend(DefaultMQProducer defaultMQProducer,
+ Message msg) throws MQClientException {
+ // ignore DelayTimeLevel parameter
+ if (msg.getDelayTimeLevel() != 0) {
+ MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+ }
+
+ Validators.checkMessage(msg, defaultMQProducer);
+
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, defaultMQProducer.getProducerGroup());
+ SendResult sendResult;
+ try {
+ sendResult = defaultMQProducer.send(msg);
+ } catch (Exception e) {
+ throw new MQClientException("send message Exception", e);
+ }
+
+ switch (sendResult.getSendStatus()) {
+ case SEND_OK: {
+ if (sendResult.getTransactionId() != null) {
+ msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
+ }
+ String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+ if (null != transactionId && !"".equals(transactionId)) {
+ msg.setTransactionId(transactionId);
+ }
+ }
+ break;
+ case FLUSH_DISK_TIMEOUT:
+ case FLUSH_SLAVE_TIMEOUT:
+ case SLAVE_NOT_AVAILABLE:
+ default:
+ throw new RuntimeException("Message send fail.");
+ }
+ return sendResult;
+ }
+
+ public static void confirm(DefaultMQProducer defaultMQProducer, Message msg,
+ SendResult sendResult) throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException {
+ DefaultMQProducerImpl defaultMQProducerImpl = defaultMQProducer.getDefaultMQProducerImpl();
+ defaultMQProducerImpl.endTransaction(msg, sendResult, LocalTransactionState.COMMIT_MESSAGE, null);
+ }
+
+ public static void cancel(DefaultMQProducer defaultMQProducer, Message msg,
+ SendResult sendResult) throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException {
+ DefaultMQProducerImpl defaultMQProducerImpl = defaultMQProducer.getDefaultMQProducerImpl();
+ defaultMQProducerImpl.endTransaction(msg, sendResult, LocalTransactionState.ROLLBACK_MESSAGE, null);
+ }
+}
diff --git a/tcc/src/main/java/io/seata/rm/tcc/rocketmq/SeataMQProducer.java b/tcc/src/main/java/io/seata/rm/tcc/rocketmq/SeataMQProducer.java
new file mode 100644
index 00000000000..3c08c3801c1
--- /dev/null
+++ b/tcc/src/main/java/io/seata/rm/tcc/rocketmq/SeataMQProducer.java
@@ -0,0 +1,303 @@
+/*
+ * Copyright 1999-2019 Seata.io Group.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.seata.rm.tcc.rocketmq;
+
+import io.seata.core.context.RootContext;
+import java.util.Collection;
+import java.util.List;
+import org.apache.rocketmq.client.QueryResult;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.exception.RequestTimeoutException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
+import org.apache.rocketmq.client.producer.MQProducer;
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.client.producer.RequestCallback;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.TransactionSendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SeataMQProducer implements MQProducer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SeataMQProducer.class);
+
+ private final DefaultMQProducer target;
+ private final TCCRocketMQ tccRocketMQ;
+
+ public SeataMQProducer(DefaultMQProducer target, TCCRocketMQ tccRocketMQ) {
+ this.target = target;
+ this.tccRocketMQ = tccRocketMQ;
+ }
+
+ @Override
+ public void start() throws MQClientException {
+ target.start();
+ }
+
+ @Override
+ public void shutdown() {
+ target.shutdown();
+ }
+
+ @Override
+ public List fetchPublishMessageQueues(String topic) throws MQClientException {
+ return target.fetchPublishMessageQueues(topic);
+ }
+
+ @Override
+ public SendResult send(
+ Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ if (RootContext.inGlobalTransaction()) {
+ LOGGER.info("DefaultMQProducer send is in Global Transaction, send() will be proxy");
+ return tccRocketMQ.prepare(null, msg);
+ } else {
+ LOGGER.info("Not in Global Transaction, send() will be proxy");
+ return target.send(msg);
+ }
+ }
+
+ @Override
+ public SendResult send(Message msg,
+ long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return target.send(msg, timeout);
+ }
+
+ @Override
+ public void send(Message msg,
+ SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
+ target.send(msg, sendCallback);
+ }
+
+ @Override
+ public void send(Message msg, SendCallback sendCallback,
+ long timeout) throws MQClientException, RemotingException, InterruptedException {
+ target.send(msg, sendCallback, timeout);
+ }
+
+ @Override
+ public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
+ target.sendOneway(msg);
+ }
+
+ @Override
+ public SendResult send(Message msg,
+ MessageQueue mq) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return target.send(msg, mq);
+ }
+
+ @Override
+ public SendResult send(Message msg, MessageQueue mq,
+ long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return target.send(msg, mq, timeout);
+ }
+
+ @Override
+ public void send(Message msg, MessageQueue mq,
+ SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
+ target.send(msg, mq, sendCallback);
+ }
+
+ @Override
+ public void send(Message msg, MessageQueue mq, SendCallback sendCallback,
+ long timeout) throws MQClientException, RemotingException, InterruptedException {
+ target.send(msg, mq, sendCallback, timeout);
+ }
+
+ @Override
+ public void sendOneway(Message msg,
+ MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
+ target.sendOneway(msg, mq);
+ }
+
+ @Override
+ public SendResult send(Message msg, MessageQueueSelector selector,
+ Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return target.send(msg, selector, arg);
+ }
+
+ @Override
+ public SendResult send(Message msg, MessageQueueSelector selector, Object arg,
+ long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return target.send(msg, selector, arg, timeout);
+ }
+
+ @Override
+ public void send(Message msg, MessageQueueSelector selector, Object arg,
+ SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
+ target.send(msg, selector, arg, sendCallback);
+ }
+
+ @Override
+ public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback,
+ long timeout) throws MQClientException, RemotingException, InterruptedException {
+ target.send(msg, selector, arg, sendCallback, timeout);
+ }
+
+ @Override
+ public Message request(Message msg,
+ long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return target.request(msg, timeout);
+ }
+
+ @Override
+ public void request(Message msg, RequestCallback requestCallback,
+ long timeout) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
+ target.request(msg, requestCallback, timeout);
+ }
+
+ @Override
+ public Message request(Message msg, MessageQueueSelector selector, Object arg,
+ long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException {
+ return target.request(msg, selector, arg, timeout);
+ }
+
+ @Override
+ public void request(Message msg, MessageQueueSelector selector, Object arg, RequestCallback requestCallback,
+ long timeout) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
+ target.request(msg, selector, arg, requestCallback, timeout);
+ }
+
+ @Override
+ public Message request(Message msg, MessageQueue mq,
+ long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException {
+ return target.request(msg, mq, timeout);
+ }
+
+ @Override
+ public void request(Message msg, MessageQueue mq, RequestCallback requestCallback,
+ long timeout) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
+ target.request(msg, mq, requestCallback, timeout);
+ }
+
+ @Override
+ public void sendOneway(Message msg, MessageQueueSelector selector,
+ Object arg) throws MQClientException, RemotingException, InterruptedException {
+ target.sendOneway(msg, selector, arg);
+ }
+
+ @Override
+ public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter,
+ Object arg) throws MQClientException {
+ return target.sendMessageInTransaction(msg, tranExecuter, arg);
+ }
+
+ @Override
+ public TransactionSendResult sendMessageInTransaction(Message msg, Object arg) throws MQClientException {
+ return target.sendMessageInTransaction(msg, arg);
+ }
+
+ @Override
+ public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
+ target.createTopic(key, newTopic, queueNum);
+ }
+
+ @Override
+ public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
+ target.createTopic(key, newTopic, queueNum, topicSysFlag);
+ }
+
+ @Override
+ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
+ return target.searchOffset(mq, timestamp);
+ }
+
+ @Override
+ public long maxOffset(MessageQueue mq) throws MQClientException {
+ return target.maxOffset(mq);
+ }
+
+ @Override
+ public long minOffset(MessageQueue mq) throws MQClientException {
+ return target.minOffset(mq);
+ }
+
+ @Override
+ public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
+ return target.earliestMsgStoreTime(mq);
+ }
+
+ @Override
+ public MessageExt viewMessage(
+ String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ return target.viewMessage(offsetMsgId);
+ }
+
+ @Override
+ public QueryResult queryMessage(String topic, String key, int maxNum, long begin,
+ long end) throws MQClientException, InterruptedException {
+ return target.queryMessage(topic, key, maxNum, begin, end);
+ }
+
+ @Override
+ public MessageExt viewMessage(String topic,
+ String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ return target.viewMessage(topic, msgId);
+ }
+
+ @Override
+ public SendResult send(
+ Collection msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return target.send(msgs);
+ }
+
+ @Override
+ public SendResult send(Collection msgs,
+ long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return target.send(msgs, timeout);
+ }
+
+ @Override
+ public SendResult send(Collection msgs,
+ MessageQueue messageQueue) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return target.send(msgs, messageQueue);
+ }
+
+ @Override
+ public SendResult send(Collection msgs, MessageQueue messageQueue,
+ long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return target.send(msgs, messageQueue, timeout);
+ }
+
+ @Override
+ public void send(Collection msgs,
+ SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ target.send(msgs, sendCallback);
+ }
+
+ @Override
+ public void send(Collection msgs, SendCallback sendCallback,
+ long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ target.send(msgs, sendCallback, timeout);
+ }
+
+ @Override
+ public void send(Collection msgs, MessageQueue mq,
+ SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ target.send(msgs, mq, sendCallback);
+ }
+
+ @Override
+ public void send(Collection msgs, MessageQueue mq, SendCallback sendCallback,
+ long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ target.send(msgs, mq, sendCallback, timeout);
+ }
+}
diff --git a/tcc/src/main/java/io/seata/rm/tcc/rocketmq/TCCRocketMQ.java b/tcc/src/main/java/io/seata/rm/tcc/rocketmq/TCCRocketMQ.java
new file mode 100644
index 00000000000..56122fba9b7
--- /dev/null
+++ b/tcc/src/main/java/io/seata/rm/tcc/rocketmq/TCCRocketMQ.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 1999-2019 Seata.io Group.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.seata.rm.tcc.rocketmq;
+
+import io.seata.rm.tcc.api.BusinessActionContext;
+import io.seata.rm.tcc.api.LocalTCC;
+import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
+import java.net.UnknownHostException;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+@LocalTCC
+public interface TCCRocketMQ {
+
+ void setDefaultMQProducer(DefaultMQProducer producer);
+
+ /**
+ * RocketMQ half send
+ *
+ * @param context thre context
+ * @param message the message
+ * @return SendResult
+ * @throws MQBrokerException
+ * @throws RemotingException
+ * @throws NoSuchFieldException
+ * @throws InterruptedException
+ * @throws MQClientException
+ */
+ @TwoPhaseBusinessAction(name = "tccRocketMQ", commitMethod = "commit", rollbackMethod = "rollback")
+ SendResult prepare(BusinessActionContext context, Message message)
+ throws MQBrokerException, RemotingException, InterruptedException, MQClientException;
+
+ /**
+ * RocketMQ half send commit
+ *
+ * @param context the BusinessActionContext
+ * @return SendResult
+ * @throws UnknownHostException
+ * @throws MQBrokerException
+ * @throws RemotingException
+ * @throws NoSuchFieldException
+ * @throws InterruptedException
+ */
+ boolean commit(BusinessActionContext context)
+ throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException;
+
+ /**
+ * RocketMQ half send rollback
+ *
+ * @param context the BusinessActionContext
+ * @return
+ * @throws UnknownHostException
+ * @throws MQBrokerException
+ * @throws RemotingException
+ * @throws NoSuchFieldException
+ * @throws InterruptedException
+ */
+ boolean rollback(BusinessActionContext context)
+ throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException;
+}
diff --git a/tcc/src/main/java/io/seata/rm/tcc/rocketmq/TCCRocketMQImpl.java b/tcc/src/main/java/io/seata/rm/tcc/rocketmq/TCCRocketMQImpl.java
new file mode 100644
index 00000000000..177e6313655
--- /dev/null
+++ b/tcc/src/main/java/io/seata/rm/tcc/rocketmq/TCCRocketMQImpl.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 1999-2019 Seata.io Group.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.seata.rm.tcc.rocketmq;
+
+import io.seata.rm.tcc.api.BusinessActionContext;
+import io.seata.rm.tcc.api.BusinessActionContextUtil;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TCCRocketMQImpl implements TCCRocketMQ {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TCCRocketMQImpl.class);
+
+ private DefaultMQProducer defaultMQProducer;
+
+ @Override
+ public void setDefaultMQProducer(DefaultMQProducer defaultMQProducer) {
+ this.defaultMQProducer = defaultMQProducer;
+ }
+
+ @Override
+ public SendResult prepare(BusinessActionContext context, Message message)
+ throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
+ SendResult sendResult = RocketMQUtils.halfSend(defaultMQProducer, message);
+ LOGGER.info("RocketMQ message send prepare, xid = {}, bid = {}", context.getXid(), context.getBranchId());
+ Map params = new HashMap<>(2);
+ params.put("message", message);
+ params.put("sendResult", sendResult);
+ BusinessActionContextUtil.addContext(params);
+ return sendResult;
+ }
+
+ @Override
+ public boolean commit(BusinessActionContext context)
+ throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException {
+ Message message = context.getActionContext("message", Message.class);
+ SendResult sendResult = context.getActionContext("sendResult", SendResult.class);
+ RocketMQUtils.confirm(defaultMQProducer, message, sendResult);
+ LOGGER.info("RocketMQ message send commit, xid = {}, branchId = {}", context.getXid(), context.getBranchId());
+ return true;
+ }
+
+ @Override
+ public boolean rollback(BusinessActionContext context)
+ throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException {
+ Message message = context.getActionContext("message", Message.class);
+ SendResult sendResult = context.getActionContext("sendResult", SendResult.class);
+ RocketMQUtils.cancel(defaultMQProducer, message, sendResult);
+ LOGGER.info("RocketMQ message send rollback, xid = {}, branchId = {}", context.getXid(), context.getBranchId());
+ return true;
+ }
+
+}
diff --git a/tm/src/main/java/io/seata/tm/api/FailureHandlerHolder.java b/tm/src/main/java/io/seata/tm/api/FailureHandlerHolder.java
index effbc93fec9..65af1efefe7 100644
--- a/tm/src/main/java/io/seata/tm/api/FailureHandlerHolder.java
+++ b/tm/src/main/java/io/seata/tm/api/FailureHandlerHolder.java
@@ -23,14 +23,14 @@ public class FailureHandlerHolder {
private static FailureHandler FAILURE_HANDLER_HOLDER = new DefaultFailureHandlerImpl();
+ public static FailureHandler getFailureHandler() {
+ return FAILURE_HANDLER_HOLDER;
+ }
+
public static void setFailureHandler(FailureHandler failureHandler) {
if (failureHandler != null) {
FAILURE_HANDLER_HOLDER = failureHandler;
}
}
- public static FailureHandler getFailureHandler() {
- return FAILURE_HANDLER_HOLDER;
- }
-
}