Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize: splitting MergedWarpMessage enhances the server parallel processing capability #6807

Open
wants to merge 13 commits into
base: 2.x
Choose a base branch
from
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6898](https://github.com/apache/incubator-seata/pull/6898)] upgrade npmjs version in saga module
- [[#6879](https://github.com/apache/incubator-seata/pull/6879)] fix log argument mismatch issue
- [[#6902](https://github.com/apache/incubator-seata/pull/6900)] optimize readme docs
- [[#6807](https://github.com/apache/incubator-seata/pull/6807)] splitting MergedWarpMessage enhances the server parallel processing capability


### refactor:
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
- [[#6879](https://github.com/apache/incubator-seata/pull/6879)] 修复日志参数不匹配问题
- [[#6898](https://github.com/apache/incubator-seata/pull/6898)] 升级 saga 模块 npmjs 版本
- [[#6902](https://github.com/apache/incubator-seata/pull/6900)] 优化 readme 文档
- [[#6807](https://github.com/apache/incubator-seata/pull/6807)] 分离merge消息使其能完全并行处理


### refactor:
Expand Down
19 changes: 13 additions & 6 deletions core/src/main/java/org/apache/seata/core/protocol/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class Version {
private static final String CURRENT = VersionInfo.VERSION;
private static final String VERSION_0_7_1 = "0.7.1";
private static final String VERSION_1_5_0 = "1.5.0";
private static final String VERSION_2_3_0 = "2.3.0";
private static final int MAX_VERSION_DOT = 3;

/**
Expand Down Expand Up @@ -86,15 +87,21 @@ public static String getChannelVersion(Channel c) {
* @return true: client version is above or equal version 1.5.0, false: on the contrary
*/
public static boolean isAboveOrEqualVersion150(String version) {
boolean isAboveOrEqualVersion150 = false;
return isAboveOrEqualVersion(version, VERSION_1_5_0);
}

public static boolean isAboveOrEqualVersion230(String version) {
return isAboveOrEqualVersion(version, VERSION_2_3_0);
}

public static boolean isAboveOrEqualVersion(String clientVersion, String divideVersion) {
boolean isAboveOrEqualVersion = false;
try {
long clientVersion = convertVersion(version);
long divideVersion = convertVersion(VERSION_1_5_0);
isAboveOrEqualVersion150 = clientVersion >= divideVersion;
isAboveOrEqualVersion = convertVersion(clientVersion) >= convertVersion(divideVersion);
} catch (Exception e) {
LOGGER.error("convert version error, clientVersion:{}", version, e);
LOGGER.error("convert version error, clientVersion:{}", clientVersion, e);
}
return isAboveOrEqualVersion150;
return isAboveOrEqualVersion;
}

public static long convertVersion(String version) throws IncompatibleVersionException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting
*/
protected final Map<Integer, MergeMessage> mergeMsgMap = new ConcurrentHashMap<>();

protected final Map<Integer, Integer> childToParentMap = new ConcurrentHashMap<>();

/**
* When batch sending is enabled, the message will be stored to basketMap
* Send via asynchronous thread {@link AbstractNettyRemotingClient.MergedSendRunnable}
Expand Down Expand Up @@ -203,8 +205,15 @@ public void sendAsyncRequest(Channel channel, Object msg) {
RpcMessage rpcMessage = buildRequestMessage(msg, msg instanceof HeartbeatMessage
? ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
: ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
if (rpcMessage.getBody() instanceof MergeMessage) {
mergeMsgMap.put(rpcMessage.getId(), (MergeMessage) rpcMessage.getBody());
Object body = rpcMessage.getBody();
if (body instanceof MergeMessage) {
mergeMsgMap.put(rpcMessage.getId(), (MergeMessage)rpcMessage.getBody());
if (body instanceof MergedWarpMessage) {
Integer parentId = rpcMessage.getId();
for (Integer msgId : ((MergedWarpMessage)rpcMessage.getBody()).msgIds) {
childToParentMap.put(msgId, parentId);
}
}
}
super.sendAsync(channel, rpcMessage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.protocol.AbstractMessage;
import org.apache.seata.core.protocol.HeartbeatMessage;
import org.apache.seata.core.protocol.MergedWarpMessage;
import org.apache.seata.core.protocol.ProtocolConstants;
import org.apache.seata.core.protocol.RpcMessage;
import org.apache.seata.core.protocol.Version;
import org.apache.seata.core.rpc.RemotingServer;
import org.apache.seata.core.rpc.RpcContext;
import org.apache.seata.core.rpc.processor.Pair;
Expand Down Expand Up @@ -270,4 +274,32 @@ public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Excep
}

}

@Override
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
Object body = rpcMessage.getBody();
RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
// If the client is not version 2.3.0 or higher, splitting MergedWarpMessage will result in the client’s mergeMsgMap not being cleared
if (body instanceof MergedWarpMessage && (StringUtils.isNotBlank(rpcContext.getVersion())
&& Version.isAboveOrEqualVersion230(rpcContext.getVersion()))) {
MergedWarpMessage mergedWarpMessage = (MergedWarpMessage)body;
for (int i = 0; i < mergedWarpMessage.msgs.size(); i++) {
RpcMessage rpcMsg =
buildRequestMessage(mergedWarpMessage.msgs.get(i), rpcMessage, mergedWarpMessage.msgIds.get(i));
super.processMessage(ctx, rpcMsg);
}
} else {
super.processMessage(ctx, rpcMessage);
}
}

private RpcMessage buildRequestMessage(AbstractMessage msg, RpcMessage rpcMessage,int id) {
RpcMessage rpcMsg = new RpcMessage();
rpcMsg.setId(id);
rpcMsg.setCodec(rpcMessage.getCodec());
rpcMsg.setCompressor(rpcMessage.getCompressor());
rpcMsg.setBody(msg);
return rpcMsg;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class NettyRemotingServer extends AbstractNettyRemotingServer {

private final AtomicBoolean initialized = new AtomicBoolean(false);

private ThreadPoolExecutor branchResultMessageExecutor = new ThreadPoolExecutor(NettyServerConfig.getMinBranchResultPoolSize(),
private final ThreadPoolExecutor branchResultMessageExecutor = new ThreadPoolExecutor(NettyServerConfig.getMinBranchResultPoolSize(),
NettyServerConfig.getMaxBranchResultPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory("BranchResultHandlerThread", NettyServerConfig.getMaxBranchResultPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ private void registerProcessor() {
super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);
// 4.registry TC response processor
ClientOnResponseProcessor onResponseProcessor =
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), childToParentMap, getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.seata.common.util.StringUtils.isNotBlank;

/**
* The rm netty client.
*
Expand Down Expand Up @@ -187,7 +189,7 @@ public void init() {
registerProcessor();
if (initialized.compareAndSet(false, true)) {
super.init();
if (org.apache.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {
if (isNotBlank(transactionServiceGroup)) {
initConnection();
}
}
Expand Down Expand Up @@ -247,7 +249,7 @@ protected Function<String, NettyPoolKey> getPoolKeyFunction() {
private void registerProcessor() {
// 1.registry TC response processor
ClientOnResponseProcessor onResponseProcessor =
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), childToParentMap, getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ public class ClientOnResponseProcessor implements RemotingProcessor {
/**
* The Merge msg map from org.apache.seata.core.rpc.netty.AbstractNettyRemotingClient#mergeMsgMap.
*/
private Map<Integer, MergeMessage> mergeMsgMap;
private final Map<Integer, MergeMessage> mergeMsgMap;

private final Map<Integer, Integer> childToParentMap;

/**
* The Futures from org.apache.seata.core.rpc.netty.AbstractNettyRemoting#futures
Expand All @@ -82,9 +84,10 @@ public class ClientOnResponseProcessor implements RemotingProcessor {
private final TransactionMessageHandler transactionMessageHandler;

public ClientOnResponseProcessor(Map<Integer, MergeMessage> mergeMsgMap,
ConcurrentHashMap<Integer, MessageFuture> futures,
ConcurrentHashMap<Integer, MessageFuture> futures, Map<Integer,Integer> childToParentMap,
TransactionMessageHandler transactionMessageHandler) {
this.mergeMsgMap = mergeMsgMap;
this.childToParentMap = childToParentMap;
this.futures = futures;
this.transactionMessageHandler = transactionMessageHandler;
}
Expand All @@ -97,6 +100,8 @@ public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exc
for (int i = 0; i < mergeMessage.msgs.size(); i++) {
int msgId = mergeMessage.msgIds.get(i);
MessageFuture future = futures.remove(msgId);
// The old version of the server will return MergeResultMessage, so it is necessary to remove the msgId from the childToParentMap.
childToParentMap.remove(msgId);
if (future == null) {
LOGGER.error("msg: {} is not found in futures, result message: {}", msgId,results.getMsgs()[i]);
} else {
Expand All @@ -109,6 +114,8 @@ public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exc
for (int i = 0; i < batchResultMessage.getMsgIds().size(); i++) {
int msgId = batchResultMessage.getMsgIds().get(i);
MessageFuture future = futures.remove(msgId);
// The old version of the server will return BatchResultMessage, so it is necessary to remove the msgId from the childToParentMap.
childToParentMap.remove(msgId);
if (future == null) {
LOGGER.error("msg: {} is not found in futures, result message: {}", msgId, batchResultMessage.getResultMessages().get(i));
} else {
Expand All @@ -122,15 +129,25 @@ public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exc
mergeMsgMap.clear();
}
} else {
MessageFuture messageFuture = futures.remove(rpcMessage.getId());
if (messageFuture != null) {
messageFuture.setResultMessage(rpcMessage.getBody());
} else {
if (rpcMessage.getBody() instanceof AbstractResultMessage) {
if (transactionMessageHandler != null) {
transactionMessageHandler.onResponse((AbstractResultMessage) rpcMessage.getBody(), null);
Integer id = rpcMessage.getId();
try {
MessageFuture messageFuture = futures.remove(id);
if (messageFuture != null) {
messageFuture.setResultMessage(rpcMessage.getBody());
} else {
if (rpcMessage.getBody() instanceof AbstractResultMessage) {
if (transactionMessageHandler != null) {
transactionMessageHandler.onResponse((AbstractResultMessage)rpcMessage.getBody(), null);
}
}
}
} finally {
// In version 2.3.0, the server does not return MergeResultMessage and BatchResultMessage
// so it is necessary to clear childToParentMap and mergeMsgMap here.
Integer parentId = childToParentMap.remove(id);
if (parentId != null) {
mergeMsgMap.remove(parentId);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.core.rpc.netty;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.seata.common.ConfigurationKeys;
import org.apache.seata.common.ConfigurationTestHelper;
import org.apache.seata.common.XID;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.common.util.UUIDGenerator;
import org.apache.seata.core.protocol.ResultCode;
import org.apache.seata.core.protocol.transaction.BranchRegisterRequest;
import org.apache.seata.core.protocol.transaction.BranchRegisterResponse;
import org.apache.seata.rm.tcc.TCCResourceManager;
import org.apache.seata.saga.engine.db.AbstractServerTest;
import org.apache.seata.server.coordinator.DefaultCoordinator;
import org.apache.seata.server.session.SessionHolder;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.channel.Channel;

public class RmNettyClientTest extends AbstractServerTest {

private static final Logger LOGGER = LoggerFactory.getLogger(RmNettyClientTest.class);

@BeforeAll
public static void init(){
ConfigurationTestHelper.putConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL, "8091");
}
@AfterAll
public static void after() {
ConfigurationTestHelper.removeConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL);
}

public static ThreadPoolExecutor initMessageExecutor() {
return new ThreadPoolExecutor(5, 5, 500, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(20000), new ThreadPoolExecutor.CallerRunsPolicy());
}

@Test
public void testMergeMsg() throws Exception {
ThreadPoolExecutor workingThreads = initMessageExecutor();
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
new Thread(() -> {
SessionHolder.init(null);
nettyRemotingServer.setHandler(DefaultCoordinator.getInstance(nettyRemotingServer));
// set registry
XID.setIpAddress(NetUtil.getLocalIp());
XID.setPort(8091);
// init snowflake for transactionId, branchId
UUIDGenerator.init(1L);
nettyRemotingServer.init();
}).start();
Thread.sleep(3000);

String applicationId = "app 1";
String transactionServiceGroup = "default_tx_group";
RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
rmNettyRemotingClient.setResourceManager(new TCCResourceManager());
rmNettyRemotingClient.init();
rmNettyRemotingClient.getClientChannelManager().initReconnect(transactionServiceGroup, true);
String serverAddress = "0.0.0.0:8091";
Channel channel = RmNettyRemotingClient.getInstance().getClientChannelManager().acquireChannel(serverAddress);
Assertions.assertNotNull(channel);

CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
CompletableFuture.runAsync(()->{
BranchRegisterRequest request = new BranchRegisterRequest();
request.setXid("127.0.0.1:8091:1249853");
request.setLockKey("lock key testSendMsgWithResponse");
request.setResourceId("resoutceId1");
BranchRegisterResponse branchRegisterResponse = null;
try {
branchRegisterResponse = (BranchRegisterResponse) rmNettyRemotingClient.sendSyncRequest(request);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
Assertions.assertNotNull(branchRegisterResponse);
Assertions.assertEquals(ResultCode.Failed, branchRegisterResponse.getResultCode());
Assertions.assertEquals("TransactionException[Could not found global transaction xid = 127.0.0.1:8091:1249853, may be has finished.]",
branchRegisterResponse.getMsg());
latch.countDown();
});
}
latch.await(10,TimeUnit.SECONDS);
nettyRemotingServer.destroy();
rmNettyRemotingClient.destroy();
}

}
Loading
Loading