-
Notifications
You must be signed in to change notification settings - Fork 8.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feature: Integrate RocketMQ into Seata #3974
Conversation
Codecov Report
📣 This organization is not using Codecov’s GitHub App Integration. We recommend you install it so Codecov can continue to function properly for your repositories. Learn more @@ Coverage Diff @@
## 2.x #3974 +/- ##
============================================
- Coverage 48.42% 48.20% -0.23%
+ Complexity 4171 4168 -3
============================================
Files 764 769 +5
Lines 26892 26979 +87
Branches 3348 3348
============================================
- Hits 13023 13004 -19
- Misses 12467 12591 +124
+ Partials 1402 1384 -18
|
…re-tcc-rocketmq # Conflicts: # tcc/pom.xml # tcc/src/main/java/io/seata/rm/tcc/rocketmq/TCCRocketMQ.java # tcc/src/main/java/io/seata/rm/tcc/rocketmq/TCCRocketMQImpl.java
…re-tcc-rocketmq # Conflicts: # seata-spring-boot-starter/src/main/java/io/seata/spring/boot/autoconfigure/RocketMQAutoConfiguration.java
...boot-starter/src/main/java/io/seata/spring/boot/autoconfigure/RocketMQAutoConfiguration.java
Outdated
Show resolved
Hide resolved
SendResult sendResult = RocketMQUtils.halfSend(defaultMQProducer, message); | ||
LOGGER.info("RocketMQ message send prepare, xid = {}, bid = {}", context.getXid(), context.getBranchId()); | ||
Map<String, Object> params = new HashMap<>(2); | ||
params.put("message", message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
没必要传递msg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I will fix it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
如果只是为了后面endtransaction,这里是不用保持mesage的,sendresult中的信息足够。 但是如果MQ的客户端设置了EndTransactionHook,内部实现回查message中获取TransactionId, 如果没有message传入,应该会抛异常出来。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ShannonDing 这种情况将有后续seata独立的seatadefaultproducer来处理,不会设置EndTransactionHook
# Conflicts: # seata-spring-boot-starter/src/main/resources/META-INF/spring.factories
Aop is based on dynamic proxy, the middleware layer should use the underlying technology |
|
再补充一下测试用例 |
@Override | ||
public SendResult prepare(BusinessActionContext context, Message message) | ||
throws MQBrokerException, RemotingException, InterruptedException, MQClientException { | ||
SendResult sendResult = RocketMQUtils.halfSend(defaultMQProducer, message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
此处有原子性问题,进入prepare分支已经注册到tc端,发出半消息后,rm宕机,此时sendResult已经不存在了,此时mq消息永久处于不可见状态
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
有记录transactionID么? 是不是可以通过服务端的回查来决定是否commit or rollback?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
有记录transactionID么? 是不是可以通过服务端的回查来决定是否commit or rollback?
seata这边是回查不了的,所以只能保证commit/rollback主动通知,由于分支注册和发送半消息存在原子性问他,如果发送半消息后rm侧宕机,那么这个半消息回查可能是无法回查到具体的事务状态,所以目前我们打算出现这种原子性导致的低概率事件时,使用无限重试回查来使这个消息ttl到直接删除即可,因为半消息对consumer是不可见的,所以对一致性没影响,只要保证commit能一定通知到rocketmq即可
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1.commit时由seata主动触发 (目前pr已实现)
2.事务发生rollback时,由seata主动rollback,如果出现原子性问题导致分支没注册上先发了消息,那么反之这个消息是不可见的就可以不管,回查我们的listener直接unkown就好了(后半部分这块要第三点做出来)
3.需要seata自己的listener不能跟业务的耦合一起,需要重构出一个新的seatadefaultproducer类供用户使用
4.branch type增加rocketmq,这块要重构脱离tcc (重构)
* @throws UnknownHostException | ||
* @throws MQBrokerException | ||
* @throws RemotingException | ||
* @throws NoSuchFieldException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
注释的异常不对应代码
* @throws UnknownHostException | ||
* @throws MQBrokerException | ||
* @throws RemotingException | ||
* @throws NoSuchFieldException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
注释的异常不对应代码
@@ -134,6 +112,25 @@ public GlobalTransactionalInterceptorHandler(FailureHandler failureHandler, Set< | |||
this.aspectTransactional = aspectTransactional; | |||
} | |||
|
|||
private void initDefaultGlobalTransactionTimeout() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个没必要挪位置吧,看不出来改动了什么
@@ -24,10 +24,6 @@ | |||
*/ | |||
public class CommonFenceStoreSqls { | |||
|
|||
private CommonFenceStoreSqls() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
为什么夹杂这么多tccfence的改动,而且看起来好像都是调了位置而已
|
||
@Override | ||
public void shutdown() { | ||
target.shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4.9.3以下的shutdown有内存泄漏问题,这个可能要考虑一下怎样提醒用户
Please resolve code conflicts |
Ⅰ. Describe what this PR did
透过Spring的后置代理器实现了RocketMQ Bean的拦截,并返回了静态代理类.
Ⅱ. Does this pull request fix one issue?
fixes #3752.
Ⅲ. Test case
https://github.com/GasolLY/seata-rocketmq-test
Ⅳ. Describe how to verify it
Ⅴ. Special notes for reviews