Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: Integrate RocketMQ into Seata #3974

Closed
wants to merge 25 commits into from

Conversation

GasolLY
Copy link

@GasolLY GasolLY commented Aug 23, 2021

Ⅰ. Describe what this PR did

透过Spring的后置代理器实现了RocketMQ Bean的拦截,并返回了静态代理类.

Ⅱ. Does this pull request fix one issue?

fixes #3752.

Ⅲ. Test case

https://github.com/GasolLY/seata-rocketmq-test

Ⅳ. Describe how to verify it

Ⅴ. Special notes for reviews

@slievrly slievrly added type: feature Category issues or prs related to feature request. Summer2021 first-time contributor first-time contributor labels Aug 24, 2021
@codecov-commenter
Copy link

codecov-commenter commented Sep 28, 2021

Codecov Report

Merging #3974 (6a85838) into 2.x (25999c3) will decrease coverage by 0.23%.
The diff coverage is 6.25%.

📣 This organization is not using Codecov’s GitHub App Integration. We recommend you install it so Codecov can continue to function properly for your repositories. Learn more

Impacted file tree graph

@@             Coverage Diff              @@
##                2.x    #3974      +/-   ##
============================================
- Coverage     48.42%   48.20%   -0.23%     
+ Complexity     4171     4168       -3     
============================================
  Files           764      769       +5     
  Lines         26892    26979      +87     
  Branches       3348     3348              
============================================
- Hits          13023    13004      -19     
- Misses        12467    12591     +124     
+ Partials       1402     1384      -18     
Impacted Files Coverage Δ
...ration/tx/api/fence/DefaultCommonFenceHandler.java 0.00% <0.00%> (ø)
...gration/tx/api/fence/config/CommonFenceConfig.java 0.00% <0.00%> (ø)
...pi/fence/store/db/CommonFenceStoreDataBaseDAO.java 0.00% <0.00%> (ø)
...x/api/fence/store/db/sql/CommonFenceStoreSqls.java 0.00% <0.00%> (ø)
...erceptor/parser/DefaultResourceRegisterParser.java 0.00% <0.00%> (ø)
...ata/integration/tx/api/json/DefaultJsonParser.java 0.00% <0.00%> (ø)
.../autoconfigure/SeataRocketMQAutoConfiguration.java 0.00% <0.00%> (ø)
...ain/java/io/seata/rm/fence/SpringFenceHandler.java 0.00% <0.00%> (ø)
...otation/scannercheckers/PackageScannerChecker.java 0.00% <0.00%> (ø)
.../java/io/seata/rm/tcc/rocketmq/RocketMQAspect.java 0.00% <0.00%> (ø)
... and 20 more

…re-tcc-rocketmq

# Conflicts:
#	tcc/pom.xml
#	tcc/src/main/java/io/seata/rm/tcc/rocketmq/TCCRocketMQ.java
#	tcc/src/main/java/io/seata/rm/tcc/rocketmq/TCCRocketMQImpl.java
…re-tcc-rocketmq

# Conflicts:
#	seata-spring-boot-starter/src/main/java/io/seata/spring/boot/autoconfigure/RocketMQAutoConfiguration.java
@slievrly slievrly added this to the 2.0.0 milestone Oct 8, 2021
seata-spring-boot-starter/pom.xml Show resolved Hide resolved
tcc/pom.xml Outdated Show resolved Hide resolved
tcc/pom.xml Show resolved Hide resolved
SendResult sendResult = RocketMQUtils.halfSend(defaultMQProducer, message);
LOGGER.info("RocketMQ message send prepare, xid = {}, bid = {}", context.getXid(), context.getBranchId());
Map<String, Object> params = new HashMap<>(2);
params.put("message", message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

没必要传递msg

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I will fix it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果只是为了后面endtransaction,这里是不用保持mesage的,sendresult中的信息足够。 但是如果MQ的客户端设置了EndTransactionHook,内部实现回查message中获取TransactionId, 如果没有message传入,应该会抛异常出来。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ShannonDing 这种情况将有后续seata独立的seatadefaultproducer来处理,不会设置EndTransactionHook

# Conflicts:
#	seata-spring-boot-starter/src/main/resources/META-INF/spring.factories
@wxbty
Copy link
Member

wxbty commented Oct 20, 2021

Aop is based on dynamic proxy, the middleware layer should use the underlying technology

@GasolLY
Copy link
Author

GasolLY commented Oct 20, 2021

Aop is based on dynamic proxy, the middleware layer should use the underlying technology
Get it😁

@funky-eyes
Copy link
Contributor

再补充一下测试用例

@GasolLY GasolLY changed the title [WIP]Integrate RocketMQ into Seata Integrate RocketMQ into Seata Mar 1, 2022
@GasolLY GasolLY changed the title Integrate RocketMQ into Seata feature:Integrate RocketMQ into Seata Mar 1, 2022
@Override
public SendResult prepare(BusinessActionContext context, Message message)
throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
SendResult sendResult = RocketMQUtils.halfSend(defaultMQProducer, message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

此处有原子性问题,进入prepare分支已经注册到tc端,发出半消息后,rm宕机,此时sendResult已经不存在了,此时mq消息永久处于不可见状态

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有记录transactionID么? 是不是可以通过服务端的回查来决定是否commit or rollback?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

有记录transactionID么? 是不是可以通过服务端的回查来决定是否commit or rollback?

seata这边是回查不了的,所以只能保证commit/rollback主动通知,由于分支注册和发送半消息存在原子性问他,如果发送半消息后rm侧宕机,那么这个半消息回查可能是无法回查到具体的事务状态,所以目前我们打算出现这种原子性导致的低概率事件时,使用无限重试回查来使这个消息ttl到直接删除即可,因为半消息对consumer是不可见的,所以对一致性没影响,只要保证commit能一定通知到rocketmq即可

@GasolLY
Copy link
Author

GasolLY commented May 27, 2022

再补充一下测试用例

Fixed: https://github.com/GasolLY/seata-rocketmq-test

@CLAassistant
Copy link

CLAassistant commented Dec 12, 2022

CLA assistant check
All committers have signed the CLA.

@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you all sign our Contributor License Agreement before we can accept your contribution.
1 out of 4 committers have signed the CLA.

✅ wangliang181230
❌ GasolLY
❌ a364176773
❌ slievrly
You have signed the CLA already but the status is still pending? Let us recheck it.

@funky-eyes funky-eyes changed the title feature:Integrate RocketMQ into Seata feature: Integrate RocketMQ into Seata Jan 30, 2023
@funky-eyes funky-eyes changed the base branch from develop to 2.x January 30, 2023 05:29
Copy link
Contributor

@funky-eyes funky-eyes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1.commit时由seata主动触发 (目前pr已实现)
2.事务发生rollback时,由seata主动rollback,如果出现原子性问题导致分支没注册上先发了消息,那么反之这个消息是不可见的就可以不管,回查我们的listener直接unkown就好了(后半部分这块要第三点做出来)
3.需要seata自己的listener不能跟业务的耦合一起,需要重构出一个新的seatadefaultproducer类供用户使用
4.branch type增加rocketmq,这块要重构脱离tcc (重构)

* @throws UnknownHostException
* @throws MQBrokerException
* @throws RemotingException
* @throws NoSuchFieldException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

注释的异常不对应代码

* @throws UnknownHostException
* @throws MQBrokerException
* @throws RemotingException
* @throws NoSuchFieldException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

注释的异常不对应代码

@@ -134,6 +112,25 @@ public GlobalTransactionalInterceptorHandler(FailureHandler failureHandler, Set<
this.aspectTransactional = aspectTransactional;
}

private void initDefaultGlobalTransactionTimeout() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个没必要挪位置吧,看不出来改动了什么

@@ -24,10 +24,6 @@
*/
public class CommonFenceStoreSqls {

private CommonFenceStoreSqls() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为什么夹杂这么多tccfence的改动,而且看起来好像都是调了位置而已


@Override
public void shutdown() {
target.shutdown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4.9.3以下的shutdown有内存泄漏问题,这个可能要考虑一下怎样提醒用户

@funky-eyes funky-eyes removed this from the 2.0.0 milestone Nov 4, 2023
@funky-eyes funky-eyes added this to the 2.1.0 milestone Nov 27, 2023
@funky-eyes
Copy link
Contributor

Please resolve code conflicts

@Bughue
Copy link
Contributor

Bughue commented Jan 18, 2024

#6230

@funky-eyes funky-eyes removed this from the 2.1.0 milestone Mar 1, 2024
@funky-eyes funky-eyes closed this Mar 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
first-time contributor first-time contributor Summer2021 type: feature Category issues or prs related to feature request.
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

[Summer2021] Integrate RocketMQ into Seata
9 participants