diff --git a/dependencies/pom.xml b/dependencies/pom.xml index 9c11cce0f28..42622605d61 100644 --- a/dependencies/pom.xml +++ b/dependencies/pom.xml @@ -113,6 +113,7 @@ 2.23.4 3.12.2 9.4.38.v20210224 + 4.9.0 1.12.17 @@ -687,6 +688,12 @@ jetty-servlet ${jetty-version} + + org.apache.rocketmq + rocketmq-client + ${rocketmq-version} + true + diff --git a/integration-tx-api/pom.xml b/integration-tx-api/pom.xml index 12cc16bc2d6..5f7410f67d7 100644 --- a/integration-tx-api/pom.xml +++ b/integration-tx-api/pom.xml @@ -14,8 +14,8 @@ ~ See the License for the specific language governing permissions and ~ limitations under the License. --> - io.seata diff --git a/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/DefaultCommonFenceHandler.java b/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/DefaultCommonFenceHandler.java index e853b46c9bb..82d1e23b52e 100644 --- a/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/DefaultCommonFenceHandler.java +++ b/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/DefaultCommonFenceHandler.java @@ -28,10 +28,6 @@ public class DefaultCommonFenceHandler implements FenceHandler { private FenceHandler fenceHandler; - private static class SingletonHolder { - private static final DefaultCommonFenceHandler INSTANCE = new DefaultCommonFenceHandler(); - } - public static DefaultCommonFenceHandler get() { return DefaultCommonFenceHandler.SingletonHolder.INSTANCE; } @@ -69,4 +65,8 @@ public int deleteFenceByDate(Date datetime) { check(); return fenceHandler.deleteFenceByDate(datetime); } + + private static class SingletonHolder { + private static final DefaultCommonFenceHandler INSTANCE = new DefaultCommonFenceHandler(); + } } diff --git a/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/config/CommonFenceConfig.java b/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/config/CommonFenceConfig.java index 1023fa16916..55aa771e3e7 100644 --- a/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/config/CommonFenceConfig.java +++ b/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/config/CommonFenceConfig.java @@ -46,24 +46,20 @@ public class CommonFenceConfig implements Disposable { * Common fence clean period max value. maximum interval is 68 years */ private static final Duration MAX_PERIOD = Duration.ofSeconds(Integer.MAX_VALUE); - + /** + * Common fence clean scheduled thread pool + */ + private final ScheduledThreadPoolExecutor tccFenceClean = new ScheduledThreadPoolExecutor(1, + new NamedThreadFactory("tccFenceClean", 1)); /** * Common fence clean period. only duration type format are supported */ private Duration cleanPeriod = Duration.ofDays(DefaultValues.DEFAULT_COMMON_FENCE_CLEAN_PERIOD); - /** * Common fence log table name */ private String logTableName = DefaultValues.DEFAULT_COMMON_FENCE_LOG_TABLE_NAME; - - /** - * Common fence clean scheduled thread pool - */ - private final ScheduledThreadPoolExecutor tccFenceClean = new ScheduledThreadPoolExecutor(1, - new NamedThreadFactory("tccFenceClean", 1)); - public AtomicBoolean getInitialized() { return initialized; } diff --git a/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/store/db/CommonFenceStoreDataBaseDAO.java b/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/store/db/CommonFenceStoreDataBaseDAO.java index 589f1e0a323..3ea8148d354 100644 --- a/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/store/db/CommonFenceStoreDataBaseDAO.java +++ b/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/store/db/CommonFenceStoreDataBaseDAO.java @@ -43,13 +43,12 @@ */ public class CommonFenceStoreDataBaseDAO implements CommonFenceStore { + private static volatile CommonFenceStoreDataBaseDAO instance = null; /** * Common fence log table name */ private String logTableName = DefaultValues.DEFAULT_COMMON_FENCE_LOG_TABLE_NAME; - private static volatile CommonFenceStoreDataBaseDAO instance = null; - private CommonFenceStoreDataBaseDAO() {} public static CommonFenceStore getInstance() { diff --git a/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/store/db/sql/CommonFenceStoreSqls.java b/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/store/db/sql/CommonFenceStoreSqls.java index 842202a2366..3b590151f85 100644 --- a/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/store/db/sql/CommonFenceStoreSqls.java +++ b/integration-tx-api/src/main/java/io/seata/integration/tx/api/fence/store/db/sql/CommonFenceStoreSqls.java @@ -24,10 +24,6 @@ */ public class CommonFenceStoreSqls { - private CommonFenceStoreSqls() { - throw new IllegalStateException("Utility class"); - } - /** * The constant LOCAL_TCC_LOG_PLACEHOLD. */ @@ -46,21 +42,27 @@ private CommonFenceStoreSqls() { + " (xid, branch_id, action_name, status, gmt_create, gmt_modified) " + " values (?, ?, ?, ?, ?, ?) "; + /** + * The constant QUERY_END_STATUS_BY_DATE. + */ + protected static final String QUERY_END_STATUS_BY_DATE = "select xid, branch_id, status, gmt_create, gmt_modified " + + "from " + LOCAL_TCC_LOG_PLACEHOLD + + " where gmt_modified < ? " + + " and status in (" + CommonFenceConstant.STATUS_COMMITTED + " , " + CommonFenceConstant.STATUS_ROLLBACKED + " , " + CommonFenceConstant.STATUS_SUSPENDED + ")" + + " limit ?"; + /** * The constant QUERY_BY_BRANCH_ID_AND_XID. */ protected static final String QUERY_BY_BRANCH_ID_AND_XID = "select xid, branch_id, status, gmt_create, gmt_modified " + "from " + LOCAL_TCC_LOG_PLACEHOLD + " where xid = ? and branch_id = ? for update"; - /** - * The constant QUERY_END_STATUS_BY_DATE. + * The constant DELETE_BY_DATE_AND_STATUS. */ - protected static final String QUERY_END_STATUS_BY_DATE = "select xid, branch_id, status, gmt_create, gmt_modified " - + "from " + LOCAL_TCC_LOG_PLACEHOLD - + " where gmt_modified < ? " - + " and status in (" + CommonFenceConstant.STATUS_COMMITTED + " , " + CommonFenceConstant.STATUS_ROLLBACKED + " , " + CommonFenceConstant.STATUS_SUSPENDED + ")" - + " limit ?"; + protected static final String DELETE_BY_DATE_AND_STATUS = "delete from " + LOCAL_TCC_LOG_PLACEHOLD + + " where gmt_modified < ? " + + " and status in (" + CommonFenceConstant.STATUS_COMMITTED + " , " + CommonFenceConstant.STATUS_ROLLBACKED + " , " + CommonFenceConstant.STATUS_SUSPENDED + ")"; /** * The constant UPDATE_STATUS_BY_BRANCH_ID_AND_XID. @@ -79,12 +81,9 @@ private CommonFenceStoreSqls() { protected static final String DELETE_BY_BRANCH_XIDS = "delete from " + LOCAL_TCC_LOG_PLACEHOLD + " where xid in (" + PRAMETER_PLACEHOLD + ")"; - /** - * The constant DELETE_BY_DATE_AND_STATUS. - */ - protected static final String DELETE_BY_DATE_AND_STATUS = "delete from " + LOCAL_TCC_LOG_PLACEHOLD - + " where gmt_modified < ? " - + " and status in (" + CommonFenceConstant.STATUS_COMMITTED + " , " + CommonFenceConstant.STATUS_ROLLBACKED + " , " + CommonFenceConstant.STATUS_SUSPENDED + ")"; + private CommonFenceStoreSqls() { + throw new IllegalStateException("Utility class"); + } public static String getInsertLocalTCCLogSQL(String localTccTable) { return INSERT_LOCAL_TCC_LOG.replace(LOCAL_TCC_LOG_PLACEHOLD, localTccTable); diff --git a/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java b/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java index 525d4c9392b..f389077de39 100644 --- a/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java +++ b/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/handler/GlobalTransactionalInterceptorHandler.java @@ -72,43 +72,21 @@ public class GlobalTransactionalInterceptorHandler extends AbstractProxyInvocati private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate(); private final GlobalLockTemplate globalLockTemplate = new GlobalLockTemplate(); - - private Set methodsToProxy; + private static int degradeCheckAllowTimes; private volatile boolean disable; private static final AtomicBoolean ATOMIC_DEGRADE_CHECK = new AtomicBoolean(false); private static volatile Integer degradeNum = 0; private static volatile Integer reachNum = 0; - private static int degradeCheckAllowTimes; - protected AspectTransactional aspectTransactional; private static int degradeCheckPeriod; - private static int defaultGlobalTransactionTimeout = 0; - private final FailureHandler failureHandler; + protected AspectTransactional aspectTransactional; + private Set methodsToProxy; private static final EventBus EVENT_BUS = new GuavaEventBus("degradeCheckEventBus", true); private static volatile ScheduledThreadPoolExecutor executor; - private void initDefaultGlobalTransactionTimeout() { - if (GlobalTransactionalInterceptorHandler.defaultGlobalTransactionTimeout <= 0) { - int defaultGlobalTransactionTimeout; - try { - defaultGlobalTransactionTimeout = ConfigurationFactory.getInstance().getInt( - ConfigurationKeys.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT); - } catch (Exception e) { - LOGGER.error("Illegal global transaction timeout value: " + e.getMessage()); - defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT; - } - if (defaultGlobalTransactionTimeout <= 0) { - LOGGER.warn("Global transaction timeout value '{}' is illegal, and has been reset to the default value '{}'", - defaultGlobalTransactionTimeout, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT); - defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT; - } - GlobalTransactionalInterceptorHandler.defaultGlobalTransactionTimeout = defaultGlobalTransactionTimeout; - } - } - public GlobalTransactionalInterceptorHandler(FailureHandler failureHandler, Set methodsToProxy) { this.failureHandler = failureHandler == null ? FailureHandlerHolder.getFailureHandler() : failureHandler; this.methodsToProxy = methodsToProxy; @@ -134,6 +112,25 @@ public GlobalTransactionalInterceptorHandler(FailureHandler failureHandler, Set< this.aspectTransactional = aspectTransactional; } + private void initDefaultGlobalTransactionTimeout() { + if (GlobalTransactionalInterceptorHandler.defaultGlobalTransactionTimeout <= 0) { + int defaultGlobalTransactionTimeout; + try { + defaultGlobalTransactionTimeout = ConfigurationFactory.getInstance().getInt( + ConfigurationKeys.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT); + } catch (Exception e) { + LOGGER.error("Illegal global transaction timeout value: " + e.getMessage()); + defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT; + } + if (defaultGlobalTransactionTimeout <= 0) { + LOGGER.warn("Global transaction timeout value '{}' is illegal, and has been reset to the default value '{}'", + defaultGlobalTransactionTimeout, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT); + defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT; + } + GlobalTransactionalInterceptorHandler.defaultGlobalTransactionTimeout = defaultGlobalTransactionTimeout; + } + } + @Override public Set getMethodsToProxy() { return methodsToProxy; diff --git a/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/parser/DefaultInterfaceParser.java b/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/parser/DefaultInterfaceParser.java index c47b361e97b..3200de44dae 100644 --- a/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/parser/DefaultInterfaceParser.java +++ b/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/parser/DefaultInterfaceParser.java @@ -31,18 +31,14 @@ public class DefaultInterfaceParser implements InterfaceParser { protected static final List ALL_INTERFACE_PARSERS = new ArrayList<>(); - private static class SingletonHolder { - private static final DefaultInterfaceParser INSTANCE = new DefaultInterfaceParser(); + protected DefaultInterfaceParser() { + initInterfaceParser(); } public static DefaultInterfaceParser get() { return DefaultInterfaceParser.SingletonHolder.INSTANCE; } - protected DefaultInterfaceParser() { - initInterfaceParser(); - } - /** * init parsers */ @@ -64,4 +60,8 @@ public ProxyInvocationHandler parserInterfaceToProxy(Object target) throws Excep return null; } + private static class SingletonHolder { + private static final DefaultInterfaceParser INSTANCE = new DefaultInterfaceParser(); + } + } \ No newline at end of file diff --git a/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/parser/DefaultResourceRegisterParser.java b/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/parser/DefaultResourceRegisterParser.java index 8e9451c2982..5202243c721 100644 --- a/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/parser/DefaultResourceRegisterParser.java +++ b/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/parser/DefaultResourceRegisterParser.java @@ -29,22 +29,18 @@ public class DefaultResourceRegisterParser { protected static List allRegisterResourceParsers = new ArrayList<>(); - public void registerResource(Object target) { - for (RegisterResourceParser registerResourceParser : allRegisterResourceParsers) { - registerResourceParser.registerResource(target); - } - } - - private static class SingletonHolder { - private static final DefaultResourceRegisterParser INSTANCE = new DefaultResourceRegisterParser(); + protected DefaultResourceRegisterParser() { + initResourceRegisterParser(); } public static DefaultResourceRegisterParser get() { return DefaultResourceRegisterParser.SingletonHolder.INSTANCE; } - protected DefaultResourceRegisterParser() { - initResourceRegisterParser(); + public void registerResource(Object target) { + for (RegisterResourceParser registerResourceParser : allRegisterResourceParsers) { + registerResourceParser.registerResource(target); + } } /** @@ -57,5 +53,9 @@ protected void initResourceRegisterParser() { } } + private static class SingletonHolder { + private static final DefaultResourceRegisterParser INSTANCE = new DefaultResourceRegisterParser(); + } + } \ No newline at end of file diff --git a/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/parser/DefaultTargetClassParser.java b/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/parser/DefaultTargetClassParser.java index 502a39138c8..84bbf7b6c78 100644 --- a/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/parser/DefaultTargetClassParser.java +++ b/integration-tx-api/src/main/java/io/seata/integration/tx/api/interceptor/parser/DefaultTargetClassParser.java @@ -30,18 +30,14 @@ public class DefaultTargetClassParser implements TargetClassParser { protected static List allTargetClassParsers = new ArrayList<>(); - private static class SingletonHolder { - private static final DefaultTargetClassParser INSTANCE = new DefaultTargetClassParser(); + protected DefaultTargetClassParser() { + initTargetClassParser(); } public static DefaultTargetClassParser get() { return DefaultTargetClassParser.SingletonHolder.INSTANCE; } - protected DefaultTargetClassParser() { - initTargetClassParser(); - } - /** * init parsers */ @@ -73,4 +69,8 @@ public Class[] findInterfaces(Object target) throws Exception { } return target.getClass().getInterfaces(); } + + private static class SingletonHolder { + private static final DefaultTargetClassParser INSTANCE = new DefaultTargetClassParser(); + } } diff --git a/integration-tx-api/src/main/java/io/seata/integration/tx/api/json/DefaultJsonParser.java b/integration-tx-api/src/main/java/io/seata/integration/tx/api/json/DefaultJsonParser.java index cc5a450538a..a82944215ec 100644 --- a/integration-tx-api/src/main/java/io/seata/integration/tx/api/json/DefaultJsonParser.java +++ b/integration-tx-api/src/main/java/io/seata/integration/tx/api/json/DefaultJsonParser.java @@ -31,10 +31,6 @@ public class DefaultJsonParser implements JsonParser { protected static List allJsonParsers = new ArrayList<>(); - private static class SingletonHolder { - private static final DefaultJsonParser INSTANCE = new DefaultJsonParser(); - } - private DefaultJsonParser() { initJsonParser(); } @@ -73,4 +69,8 @@ public T parseObject(String text, Class clazz) { return null; } + private static class SingletonHolder { + private static final DefaultJsonParser INSTANCE = new DefaultJsonParser(); + } + } diff --git a/script/client/spring/application.properties b/script/client/spring/application.properties index 662b4de3ced..3fcdc985a56 100755 --- a/script/client/spring/application.properties +++ b/script/client/spring/application.properties @@ -15,6 +15,7 @@ # seata.enabled=true +seata.rocketmq-enabled=false seata.scan-packages=firstPackage,secondPackage seata.excludes-for-scanning=firstBeanNameForExclude,secondBeanNameForExclude seata.excludes-for-auto-proxying=firstClassNameForExclude,secondClassNameForExclude diff --git a/script/client/spring/application.yml b/script/client/spring/application.yml index 9a62f299e30..53f11055979 100755 --- a/script/client/spring/application.yml +++ b/script/client/spring/application.yml @@ -1,5 +1,6 @@ seata: enabled: true + rocketmq-enabled: false application-id: applicationName tx-service-group: default_tx_group access-key: aliyunAccessKey diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-client/src/main/java/io/seata/spring/boot/autoconfigure/properties/SeataProperties.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-client/src/main/java/io/seata/spring/boot/autoconfigure/properties/SeataProperties.java index a93dd819782..fcf9cc17291 100644 --- a/seata-spring-autoconfigure/seata-spring-autoconfigure-client/src/main/java/io/seata/spring/boot/autoconfigure/properties/SeataProperties.java +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-client/src/main/java/io/seata/spring/boot/autoconfigure/properties/SeataProperties.java @@ -32,6 +32,10 @@ public class SeataProperties { * whether enable auto configuration */ private boolean enabled = true; + /** + * whether enable rocketmq integrate + */ + private boolean rocketmqEnabled = false; /** * application id */ @@ -87,6 +91,14 @@ public SeataProperties setEnabled(boolean enabled) { return this; } + public boolean isRocketmqEnabled() { + return rocketmqEnabled; + } + + public void setRocketmqEnabled(boolean rocketmqEnabled) { + this.rocketmqEnabled = rocketmqEnabled; + } + public String getApplicationId() { if (applicationId == null) { applicationId = springCloudAlibabaConfiguration.getApplicationId(); diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-client/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/seata-spring-autoconfigure/seata-spring-autoconfigure-client/src/main/resources/META-INF/additional-spring-configuration-metadata.json index 0b20cfc4f20..191beb7c318 100644 --- a/seata-spring-autoconfigure/seata-spring-autoconfigure-client/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-client/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -13,6 +13,13 @@ "sourceType": "io.seata.spring.boot.autoconfigure.properties.SeataProperties", "defaultValue": "AT" }, + { + "name": "seata.rocketmq-enabled", + "type": "java.lang.Boolean", + "sourceType": "io.seata.spring.boot.autoconfigure.properties.SeataProperties", + "description": "Enable seata integrate rocketmq", + "defaultValue": false + }, { "name": "spring.cloud.alibaba.seata.application-id", "type": "java.lang.String", diff --git a/seata-spring-boot-starter/pom.xml b/seata-spring-boot-starter/pom.xml index ec144db0d90..2f305c3cd47 100644 --- a/seata-spring-boot-starter/pom.xml +++ b/seata-spring-boot-starter/pom.xml @@ -63,6 +63,13 @@ spring-boot-configuration-processor true + + + org.apache.rocketmq + rocketmq-client + true + provided + diff --git a/seata-spring-boot-starter/src/main/java/io/seata/spring/boot/autoconfigure/SeataRocketMQAutoConfiguration.java b/seata-spring-boot-starter/src/main/java/io/seata/spring/boot/autoconfigure/SeataRocketMQAutoConfiguration.java new file mode 100644 index 00000000000..4c71521ef38 --- /dev/null +++ b/seata-spring-boot-starter/src/main/java/io/seata/spring/boot/autoconfigure/SeataRocketMQAutoConfiguration.java @@ -0,0 +1,43 @@ +/* + * Copyright 1999-2019 Seata.io Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.spring.boot.autoconfigure; + +import io.seata.rm.tcc.rocketmq.RocketMQAspect; +import io.seata.rm.tcc.rocketmq.TCCRocketMQ; +import io.seata.rm.tcc.rocketmq.TCCRocketMQImpl; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.context.annotation.Bean; + +@ConditionalOnClass(name = "org.apache.rocketmq.client.producer.DefaultMQProducer") +@ConditionalOnBean(DefaultMQProducer.class) +@ConditionalOnExpression("${seata.enabled:true} && ${seata.rocketmq-enabled:false}") +public class SeataRocketMQAutoConfiguration { + + @Bean + @ConditionalOnMissingBean + public TCCRocketMQ tccRocketMQ() { + return new TCCRocketMQImpl(); + } + + @Bean + public RocketMQAspect rocketMQAspect(TCCRocketMQ tccRocketMQ) { + return new RocketMQAspect(tccRocketMQ); + } +} \ No newline at end of file diff --git a/seata-spring-boot-starter/src/main/resources/META-INF/spring.factories b/seata-spring-boot-starter/src/main/resources/META-INF/spring.factories index b83ade816ef..382daf032dc 100644 --- a/seata-spring-boot-starter/src/main/resources/META-INF/spring.factories +++ b/seata-spring-boot-starter/src/main/resources/META-INF/spring.factories @@ -3,4 +3,5 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ io.seata.spring.boot.autoconfigure.SeataAutoConfiguration,\ io.seata.spring.boot.autoconfigure.SeataDataSourceAutoConfiguration,\ io.seata.spring.boot.autoconfigure.SeataHttpAutoConfiguration,\ -io.seata.spring.boot.autoconfigure.SeataSagaAutoConfiguration +io.seata.spring.boot.autoconfigure.SeataSagaAutoConfiguration,\ +io.seata.spring.boot.autoconfigure.SeataRocketMQAutoConfiguration diff --git a/spring/src/main/java/io/seata/rm/fence/SpringFenceHandler.java b/spring/src/main/java/io/seata/rm/fence/SpringFenceHandler.java index 8e6669018cb..ea39c66dc09 100644 --- a/spring/src/main/java/io/seata/rm/fence/SpringFenceHandler.java +++ b/spring/src/main/java/io/seata/rm/fence/SpringFenceHandler.java @@ -96,6 +96,78 @@ public static void setTransactionTemplate(TransactionTemplate transactionTemplat SpringFenceHandler.transactionTemplate = transactionTemplate; } + /** + * Insert Common fence log + * + * @param conn the db connection + * @param xid the xid + * @param branchId the branchId + * @param status the status + * @return the boolean + */ + private static boolean insertCommonFenceLog(Connection conn, String xid, Long branchId, String actionName, Integer status) { + CommonFenceDO commonFenceDO = new CommonFenceDO(); + commonFenceDO.setXid(xid); + commonFenceDO.setBranchId(branchId); + commonFenceDO.setActionName(actionName); + commonFenceDO.setStatus(status); + return COMMON_FENCE_DAO.insertCommonFenceDO(conn, commonFenceDO); + } + + /** + * Update TCC Fence status and invoke target method + * + * @param method target method + * @param targetTCCBean target bean + * @param xid the global transaction id + * @param branchId the branch transaction id + * @param status the tcc fence status + * @return the boolean + */ + private static boolean updateStatusAndInvokeTargetMethod(Connection conn, Method method, Object targetTCCBean, + String xid, Long branchId, int status, + TransactionStatus transactionStatus, + Object[] args) throws Exception { + boolean result = COMMON_FENCE_DAO.updateCommonFenceDO(conn, xid, branchId, status, CommonFenceConstant.STATUS_TRIED); + if (result) { + // invoke two phase method + Object ret = method.invoke(targetTCCBean, args); + if (null != ret) { + if (ret instanceof TwoPhaseResult) { + result = ((TwoPhaseResult) ret).isSuccess(); + } else { + result = (boolean) ret; + } + // If the business execution result is false, the transaction will be rolled back + if (!result) { + transactionStatus.setRollbackOnly(); + } + } + } + return result; + } + + /** + * Delete Common Fence + * + * @param xid the global transaction id + * @param branchId the branch transaction id + * @return the boolean + */ + public static boolean deleteFence(String xid, Long branchId) { + return transactionTemplate.execute(status -> { + boolean ret = false; + try { + Connection conn = DataSourceUtils.getConnection(dataSource); + ret = COMMON_FENCE_DAO.deleteCommonFenceDO(conn, xid, branchId); + } catch (RuntimeException e) { + status.setRollbackOnly(); + LOGGER.error("delete fence log failed, xid: {}, branchId: {}", xid, branchId, e); + } + return ret; + }); + } + /** * common prepare method enhanced * @@ -218,78 +290,6 @@ public boolean rollbackFence(Method rollbackMethod, Object targetTCCBean, }); } - /** - * Insert Common fence log - * - * @param conn the db connection - * @param xid the xid - * @param branchId the branchId - * @param status the status - * @return the boolean - */ - private static boolean insertCommonFenceLog(Connection conn, String xid, Long branchId, String actionName, Integer status) { - CommonFenceDO commonFenceDO = new CommonFenceDO(); - commonFenceDO.setXid(xid); - commonFenceDO.setBranchId(branchId); - commonFenceDO.setActionName(actionName); - commonFenceDO.setStatus(status); - return COMMON_FENCE_DAO.insertCommonFenceDO(conn, commonFenceDO); - } - - /** - * Update TCC Fence status and invoke target method - * - * @param method target method - * @param targetTCCBean target bean - * @param xid the global transaction id - * @param branchId the branch transaction id - * @param status the tcc fence status - * @return the boolean - */ - private static boolean updateStatusAndInvokeTargetMethod(Connection conn, Method method, Object targetTCCBean, - String xid, Long branchId, int status, - TransactionStatus transactionStatus, - Object[] args) throws Exception { - boolean result = COMMON_FENCE_DAO.updateCommonFenceDO(conn, xid, branchId, status, CommonFenceConstant.STATUS_TRIED); - if (result) { - // invoke two phase method - Object ret = method.invoke(targetTCCBean, args); - if (null != ret) { - if (ret instanceof TwoPhaseResult) { - result = ((TwoPhaseResult) ret).isSuccess(); - } else { - result = (boolean) ret; - } - // If the business execution result is false, the transaction will be rolled back - if (!result) { - transactionStatus.setRollbackOnly(); - } - } - } - return result; - } - - /** - * Delete Common Fence - * - * @param xid the global transaction id - * @param branchId the branch transaction id - * @return the boolean - */ - public static boolean deleteFence(String xid, Long branchId) { - return transactionTemplate.execute(status -> { - boolean ret = false; - try { - Connection conn = DataSourceUtils.getConnection(dataSource); - ret = COMMON_FENCE_DAO.deleteCommonFenceDO(conn, xid, branchId); - } catch (RuntimeException e) { - status.setRollbackOnly(); - LOGGER.error("delete fence log failed, xid: {}, branchId: {}", xid, branchId, e); - } - return ret; - }); - } - /** * Delete Common Fence By Datetime * diff --git a/spring/src/main/java/io/seata/spring/annotation/EnableRocketMQAspect.java b/spring/src/main/java/io/seata/spring/annotation/EnableRocketMQAspect.java new file mode 100644 index 00000000000..daf59301fda --- /dev/null +++ b/spring/src/main/java/io/seata/spring/annotation/EnableRocketMQAspect.java @@ -0,0 +1,32 @@ +/* + * Copyright 1999-2019 Seata.io Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.spring.annotation; + +import io.seata.rm.tcc.rocketmq.RocketMQAspect; +import io.seata.rm.tcc.rocketmq.TCCRocketMQImpl; +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import org.springframework.context.annotation.Import; + +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@Import({RocketMQAspect.class, TCCRocketMQImpl.class}) +public @interface EnableRocketMQAspect { +} \ No newline at end of file diff --git a/spring/src/main/java/io/seata/spring/annotation/scannercheckers/PackageScannerChecker.java b/spring/src/main/java/io/seata/spring/annotation/scannercheckers/PackageScannerChecker.java index 30c849be5de..86cd021b3dc 100644 --- a/spring/src/main/java/io/seata/spring/annotation/scannercheckers/PackageScannerChecker.java +++ b/spring/src/main/java/io/seata/spring/annotation/scannercheckers/PackageScannerChecker.java @@ -58,7 +58,7 @@ public static void addScannablePackages(String... packages) { @Override public boolean check(Object bean, String beanName, @Nullable ConfigurableListableBeanFactory beanFactory) throws Exception { - if (SCANNABLE_PACKAGE_SET.isEmpty()) { + if (SCANNABLE_PACKAGE_SET.isEmpty() || bean.getClass().getName().startsWith("io.seata")) { // if empty, pass this checker return true; } diff --git a/tcc/pom.xml b/tcc/pom.xml index a9f9e847841..3f36abb01b7 100644 --- a/tcc/pom.xml +++ b/tcc/pom.xml @@ -52,6 +52,14 @@ com.alibaba fastjson + + + + org.apache.rocketmq + rocketmq-client + true + provided + ${project.groupId} seata-integration-tx-api diff --git a/tcc/src/main/java/io/seata/rm/tcc/rocketmq/RocketMQAspect.java b/tcc/src/main/java/io/seata/rm/tcc/rocketmq/RocketMQAspect.java new file mode 100644 index 00000000000..7619dfa1d05 --- /dev/null +++ b/tcc/src/main/java/io/seata/rm/tcc/rocketmq/RocketMQAspect.java @@ -0,0 +1,48 @@ +/* + * Copyright 1999-2019 Seata.io Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.rm.tcc.rocketmq; + +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; + +public class RocketMQAspect implements BeanPostProcessor { + + public static Logger LOGGER = LoggerFactory.getLogger(RocketMQAspect.class); + + private final TCCRocketMQ tccRocketMQ; + + public RocketMQAspect(TCCRocketMQ tccRocketMQ) { + this.tccRocketMQ = tccRocketMQ; + } + + @Override + public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { + return bean; + } + + @Override + public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { + if (bean instanceof DefaultMQProducer) { + LOGGER.info("Generate RocketMQ Producer Proxy"); + tccRocketMQ.setDefaultMQProducer((DefaultMQProducer) bean); + return new SeataMQProducer((DefaultMQProducer) bean, tccRocketMQ); + } + return bean; + } +} diff --git a/tcc/src/main/java/io/seata/rm/tcc/rocketmq/RocketMQUtils.java b/tcc/src/main/java/io/seata/rm/tcc/rocketmq/RocketMQUtils.java new file mode 100644 index 00000000000..9103225719a --- /dev/null +++ b/tcc/src/main/java/io/seata/rm/tcc/rocketmq/RocketMQUtils.java @@ -0,0 +1,82 @@ +/* + * Copyright 1999-2019 Seata.io Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.rm.tcc.rocketmq; + +import java.net.UnknownHostException; +import org.apache.rocketmq.client.Validators; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.remoting.exception.RemotingException; + +public class RocketMQUtils { + + public static SendResult halfSend(DefaultMQProducer defaultMQProducer, + Message msg) throws MQClientException { + // ignore DelayTimeLevel parameter + if (msg.getDelayTimeLevel() != 0) { + MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); + } + + Validators.checkMessage(msg, defaultMQProducer); + + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, defaultMQProducer.getProducerGroup()); + SendResult sendResult; + try { + sendResult = defaultMQProducer.send(msg); + } catch (Exception e) { + throw new MQClientException("send message Exception", e); + } + + switch (sendResult.getSendStatus()) { + case SEND_OK: { + if (sendResult.getTransactionId() != null) { + msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); + } + String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); + if (null != transactionId && !"".equals(transactionId)) { + msg.setTransactionId(transactionId); + } + } + break; + case FLUSH_DISK_TIMEOUT: + case FLUSH_SLAVE_TIMEOUT: + case SLAVE_NOT_AVAILABLE: + default: + throw new RuntimeException("Message send fail."); + } + return sendResult; + } + + public static void confirm(DefaultMQProducer defaultMQProducer, Message msg, + SendResult sendResult) throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException { + DefaultMQProducerImpl defaultMQProducerImpl = defaultMQProducer.getDefaultMQProducerImpl(); + defaultMQProducerImpl.endTransaction(msg, sendResult, LocalTransactionState.COMMIT_MESSAGE, null); + } + + public static void cancel(DefaultMQProducer defaultMQProducer, Message msg, + SendResult sendResult) throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException { + DefaultMQProducerImpl defaultMQProducerImpl = defaultMQProducer.getDefaultMQProducerImpl(); + defaultMQProducerImpl.endTransaction(msg, sendResult, LocalTransactionState.ROLLBACK_MESSAGE, null); + } +} diff --git a/tcc/src/main/java/io/seata/rm/tcc/rocketmq/SeataMQProducer.java b/tcc/src/main/java/io/seata/rm/tcc/rocketmq/SeataMQProducer.java new file mode 100644 index 00000000000..3c08c3801c1 --- /dev/null +++ b/tcc/src/main/java/io/seata/rm/tcc/rocketmq/SeataMQProducer.java @@ -0,0 +1,303 @@ +/* + * Copyright 1999-2019 Seata.io Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.rm.tcc.rocketmq; + +import io.seata.core.context.RootContext; +import java.util.Collection; +import java.util.List; +import org.apache.rocketmq.client.QueryResult; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.exception.RequestTimeoutException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; +import org.apache.rocketmq.client.producer.MQProducer; +import org.apache.rocketmq.client.producer.MessageQueueSelector; +import org.apache.rocketmq.client.producer.RequestCallback; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.TransactionSendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SeataMQProducer implements MQProducer { + private static final Logger LOGGER = LoggerFactory.getLogger(SeataMQProducer.class); + + private final DefaultMQProducer target; + private final TCCRocketMQ tccRocketMQ; + + public SeataMQProducer(DefaultMQProducer target, TCCRocketMQ tccRocketMQ) { + this.target = target; + this.tccRocketMQ = tccRocketMQ; + } + + @Override + public void start() throws MQClientException { + target.start(); + } + + @Override + public void shutdown() { + target.shutdown(); + } + + @Override + public List fetchPublishMessageQueues(String topic) throws MQClientException { + return target.fetchPublishMessageQueues(topic); + } + + @Override + public SendResult send( + Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + if (RootContext.inGlobalTransaction()) { + LOGGER.info("DefaultMQProducer send is in Global Transaction, send() will be proxy"); + return tccRocketMQ.prepare(null, msg); + } else { + LOGGER.info("Not in Global Transaction, send() will be proxy"); + return target.send(msg); + } + } + + @Override + public SendResult send(Message msg, + long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return target.send(msg, timeout); + } + + @Override + public void send(Message msg, + SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { + target.send(msg, sendCallback); + } + + @Override + public void send(Message msg, SendCallback sendCallback, + long timeout) throws MQClientException, RemotingException, InterruptedException { + target.send(msg, sendCallback, timeout); + } + + @Override + public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException { + target.sendOneway(msg); + } + + @Override + public SendResult send(Message msg, + MessageQueue mq) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return target.send(msg, mq); + } + + @Override + public SendResult send(Message msg, MessageQueue mq, + long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return target.send(msg, mq, timeout); + } + + @Override + public void send(Message msg, MessageQueue mq, + SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { + target.send(msg, mq, sendCallback); + } + + @Override + public void send(Message msg, MessageQueue mq, SendCallback sendCallback, + long timeout) throws MQClientException, RemotingException, InterruptedException { + target.send(msg, mq, sendCallback, timeout); + } + + @Override + public void sendOneway(Message msg, + MessageQueue mq) throws MQClientException, RemotingException, InterruptedException { + target.sendOneway(msg, mq); + } + + @Override + public SendResult send(Message msg, MessageQueueSelector selector, + Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return target.send(msg, selector, arg); + } + + @Override + public SendResult send(Message msg, MessageQueueSelector selector, Object arg, + long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return target.send(msg, selector, arg, timeout); + } + + @Override + public void send(Message msg, MessageQueueSelector selector, Object arg, + SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { + target.send(msg, selector, arg, sendCallback); + } + + @Override + public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, + long timeout) throws MQClientException, RemotingException, InterruptedException { + target.send(msg, selector, arg, sendCallback, timeout); + } + + @Override + public Message request(Message msg, + long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException { + return target.request(msg, timeout); + } + + @Override + public void request(Message msg, RequestCallback requestCallback, + long timeout) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { + target.request(msg, requestCallback, timeout); + } + + @Override + public Message request(Message msg, MessageQueueSelector selector, Object arg, + long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException { + return target.request(msg, selector, arg, timeout); + } + + @Override + public void request(Message msg, MessageQueueSelector selector, Object arg, RequestCallback requestCallback, + long timeout) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { + target.request(msg, selector, arg, requestCallback, timeout); + } + + @Override + public Message request(Message msg, MessageQueue mq, + long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException { + return target.request(msg, mq, timeout); + } + + @Override + public void request(Message msg, MessageQueue mq, RequestCallback requestCallback, + long timeout) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { + target.request(msg, mq, requestCallback, timeout); + } + + @Override + public void sendOneway(Message msg, MessageQueueSelector selector, + Object arg) throws MQClientException, RemotingException, InterruptedException { + target.sendOneway(msg, selector, arg); + } + + @Override + public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, + Object arg) throws MQClientException { + return target.sendMessageInTransaction(msg, tranExecuter, arg); + } + + @Override + public TransactionSendResult sendMessageInTransaction(Message msg, Object arg) throws MQClientException { + return target.sendMessageInTransaction(msg, arg); + } + + @Override + public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { + target.createTopic(key, newTopic, queueNum); + } + + @Override + public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException { + target.createTopic(key, newTopic, queueNum, topicSysFlag); + } + + @Override + public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { + return target.searchOffset(mq, timestamp); + } + + @Override + public long maxOffset(MessageQueue mq) throws MQClientException { + return target.maxOffset(mq); + } + + @Override + public long minOffset(MessageQueue mq) throws MQClientException { + return target.minOffset(mq); + } + + @Override + public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException { + return target.earliestMsgStoreTime(mq); + } + + @Override + public MessageExt viewMessage( + String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return target.viewMessage(offsetMsgId); + } + + @Override + public QueryResult queryMessage(String topic, String key, int maxNum, long begin, + long end) throws MQClientException, InterruptedException { + return target.queryMessage(topic, key, maxNum, begin, end); + } + + @Override + public MessageExt viewMessage(String topic, + String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + return target.viewMessage(topic, msgId); + } + + @Override + public SendResult send( + Collection msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return target.send(msgs); + } + + @Override + public SendResult send(Collection msgs, + long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return target.send(msgs, timeout); + } + + @Override + public SendResult send(Collection msgs, + MessageQueue messageQueue) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return target.send(msgs, messageQueue); + } + + @Override + public SendResult send(Collection msgs, MessageQueue messageQueue, + long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return target.send(msgs, messageQueue, timeout); + } + + @Override + public void send(Collection msgs, + SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + target.send(msgs, sendCallback); + } + + @Override + public void send(Collection msgs, SendCallback sendCallback, + long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + target.send(msgs, sendCallback, timeout); + } + + @Override + public void send(Collection msgs, MessageQueue mq, + SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + target.send(msgs, mq, sendCallback); + } + + @Override + public void send(Collection msgs, MessageQueue mq, SendCallback sendCallback, + long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + target.send(msgs, mq, sendCallback, timeout); + } +} diff --git a/tcc/src/main/java/io/seata/rm/tcc/rocketmq/TCCRocketMQ.java b/tcc/src/main/java/io/seata/rm/tcc/rocketmq/TCCRocketMQ.java new file mode 100644 index 00000000000..56122fba9b7 --- /dev/null +++ b/tcc/src/main/java/io/seata/rm/tcc/rocketmq/TCCRocketMQ.java @@ -0,0 +1,77 @@ +/* + * Copyright 1999-2019 Seata.io Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.rm.tcc.rocketmq; + +import io.seata.rm.tcc.api.BusinessActionContext; +import io.seata.rm.tcc.api.LocalTCC; +import io.seata.rm.tcc.api.TwoPhaseBusinessAction; +import java.net.UnknownHostException; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; + +@LocalTCC +public interface TCCRocketMQ { + + void setDefaultMQProducer(DefaultMQProducer producer); + + /** + * RocketMQ half send + * + * @param context thre context + * @param message the message + * @return SendResult + * @throws MQBrokerException + * @throws RemotingException + * @throws NoSuchFieldException + * @throws InterruptedException + * @throws MQClientException + */ + @TwoPhaseBusinessAction(name = "tccRocketMQ", commitMethod = "commit", rollbackMethod = "rollback") + SendResult prepare(BusinessActionContext context, Message message) + throws MQBrokerException, RemotingException, InterruptedException, MQClientException; + + /** + * RocketMQ half send commit + * + * @param context the BusinessActionContext + * @return SendResult + * @throws UnknownHostException + * @throws MQBrokerException + * @throws RemotingException + * @throws NoSuchFieldException + * @throws InterruptedException + */ + boolean commit(BusinessActionContext context) + throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException; + + /** + * RocketMQ half send rollback + * + * @param context the BusinessActionContext + * @return + * @throws UnknownHostException + * @throws MQBrokerException + * @throws RemotingException + * @throws NoSuchFieldException + * @throws InterruptedException + */ + boolean rollback(BusinessActionContext context) + throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException; +} diff --git a/tcc/src/main/java/io/seata/rm/tcc/rocketmq/TCCRocketMQImpl.java b/tcc/src/main/java/io/seata/rm/tcc/rocketmq/TCCRocketMQImpl.java new file mode 100644 index 00000000000..177e6313655 --- /dev/null +++ b/tcc/src/main/java/io/seata/rm/tcc/rocketmq/TCCRocketMQImpl.java @@ -0,0 +1,74 @@ +/* + * Copyright 1999-2019 Seata.io Group. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.rm.tcc.rocketmq; + +import io.seata.rm.tcc.api.BusinessActionContext; +import io.seata.rm.tcc.api.BusinessActionContextUtil; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TCCRocketMQImpl implements TCCRocketMQ { + private static final Logger LOGGER = LoggerFactory.getLogger(TCCRocketMQImpl.class); + + private DefaultMQProducer defaultMQProducer; + + @Override + public void setDefaultMQProducer(DefaultMQProducer defaultMQProducer) { + this.defaultMQProducer = defaultMQProducer; + } + + @Override + public SendResult prepare(BusinessActionContext context, Message message) + throws MQBrokerException, RemotingException, InterruptedException, MQClientException { + SendResult sendResult = RocketMQUtils.halfSend(defaultMQProducer, message); + LOGGER.info("RocketMQ message send prepare, xid = {}, bid = {}", context.getXid(), context.getBranchId()); + Map params = new HashMap<>(2); + params.put("message", message); + params.put("sendResult", sendResult); + BusinessActionContextUtil.addContext(params); + return sendResult; + } + + @Override + public boolean commit(BusinessActionContext context) + throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException { + Message message = context.getActionContext("message", Message.class); + SendResult sendResult = context.getActionContext("sendResult", SendResult.class); + RocketMQUtils.confirm(defaultMQProducer, message, sendResult); + LOGGER.info("RocketMQ message send commit, xid = {}, branchId = {}", context.getXid(), context.getBranchId()); + return true; + } + + @Override + public boolean rollback(BusinessActionContext context) + throws UnknownHostException, MQBrokerException, RemotingException, InterruptedException { + Message message = context.getActionContext("message", Message.class); + SendResult sendResult = context.getActionContext("sendResult", SendResult.class); + RocketMQUtils.cancel(defaultMQProducer, message, sendResult); + LOGGER.info("RocketMQ message send rollback, xid = {}, branchId = {}", context.getXid(), context.getBranchId()); + return true; + } + +} diff --git a/tm/src/main/java/io/seata/tm/api/FailureHandlerHolder.java b/tm/src/main/java/io/seata/tm/api/FailureHandlerHolder.java index effbc93fec9..65af1efefe7 100644 --- a/tm/src/main/java/io/seata/tm/api/FailureHandlerHolder.java +++ b/tm/src/main/java/io/seata/tm/api/FailureHandlerHolder.java @@ -23,14 +23,14 @@ public class FailureHandlerHolder { private static FailureHandler FAILURE_HANDLER_HOLDER = new DefaultFailureHandlerImpl(); + public static FailureHandler getFailureHandler() { + return FAILURE_HANDLER_HOLDER; + } + public static void setFailureHandler(FailureHandler failureHandler) { if (failureHandler != null) { FAILURE_HANDLER_HOLDER = failureHandler; } } - public static FailureHandler getFailureHandler() { - return FAILURE_HANDLER_HOLDER; - } - }