diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index fcdb6c6523a..2dd147c01b5 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -5,6 +5,7 @@ Add changes here for all PR submitted to the 2.x branch. ### feature: - [[#6876](https://github.com/apache/incubator-seata/pull/6876)]support kingbase +- [[#6881](https://github.com/apache/incubator-seata/pull/6881)]support grpc ### bugfix: @@ -35,6 +36,7 @@ Thanks to these contributors for their code commits. Please report an unintended - [dk2k](https://github.com/dk2k) - [MaoMaoandSnail](https://github.com/MaoMaoandSnail) - [yougecn](https://github.com/yougecn) +- [PleaseGiveMeTheCoke](https://github.com/PleaseGiveMeTheCoke) diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 016c32c4651..624db67f904 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -4,6 +4,7 @@ ### feature: [[#6876](https://github.com/apache/incubator-seata/pull/6876)]支持人大金仓数据库(kingbase) +[[#6881](https://github.com/apache/incubator-seata/pull/6881)]全链路支持grpc ### bugfix: @@ -35,7 +36,7 @@ - [dk2k](https://github.com/dk2k) - [MaoMaoandSnail](https://github.com/MaoMaoandSnail) - [yougecn](https://github.com/yougecn) - +- [PleaseGiveMeTheCoke](https://github.com/PleaseGiveMeTheCoke) 同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。 diff --git a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java index dec76f51b7c..ff8436b6dc9 100644 --- a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java +++ b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java @@ -628,6 +628,8 @@ public interface ConfigurationKeys { @Deprecated String ENABLE_CLIENT_BATCH_SEND_REQUEST = TRANSPORT_PREFIX + "enableClientBatchSendRequest"; + String TRANSPORT_PROTOCOL = TRANSPORT_PREFIX + "protocol"; + /** * The constant ENABLE_TM_CLIENT_BATCH_SEND_REQUEST */ diff --git a/common/src/main/java/org/apache/seata/common/DefaultValues.java b/common/src/main/java/org/apache/seata/common/DefaultValues.java index 8c484a1ab0f..eb0d40bb308 100644 --- a/common/src/main/java/org/apache/seata/common/DefaultValues.java +++ b/common/src/main/java/org/apache/seata/common/DefaultValues.java @@ -63,6 +63,7 @@ public interface DefaultValues { String DEFAULT_BOSS_THREAD_PREFIX = "NettyBoss"; String DEFAULT_NIO_WORKER_THREAD_PREFIX = "NettyServerNIOWorker"; String DEFAULT_EXECUTOR_THREAD_PREFIX = "NettyServerBizHandler"; + String DEFAULT_PROTOCOL = "seata"; boolean DEFAULT_TRANSPORT_HEARTBEAT = true; boolean DEFAULT_TRANSACTION_UNDO_DATA_VALIDATION = true; diff --git a/core/pom.xml b/core/pom.xml index 9de6107bc03..26ce3dc9018 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -69,6 +69,10 @@ fastjson test + + com.google.protobuf + protobuf-java + @@ -90,6 +94,23 @@ + + org.xolstice.maven.plugins + protobuf-maven-plugin + + ${project.basedir}/src/main/resources/protobuf/org/apache/seata/protocol/transcation/ + + com.google.protobuf:protoc:3.25.4:exe:${os.detected.classifier} + + + + + + compile + + + + diff --git a/core/src/main/java/org/apache/seata/core/protocol/Protocol.java b/core/src/main/java/org/apache/seata/core/protocol/Protocol.java new file mode 100644 index 00000000000..fe3cc000cfc --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/protocol/Protocol.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.protocol; + +/** + * seata transport protocol + */ +public enum Protocol { + + /** + * grpc + */ + GPRC("grpc"), + + /** + * seata + */ + SEATA("seata"); + + public final String value; + + Protocol(String value) { + this.value = value; + } +} diff --git a/core/src/main/java/org/apache/seata/core/protocol/detector/Http2Detector.java b/core/src/main/java/org/apache/seata/core/protocol/detector/Http2Detector.java new file mode 100644 index 00000000000..a004894f9b1 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/protocol/detector/Http2Detector.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.protocol.detector; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.http2.Http2FrameCodecBuilder; +import io.netty.handler.codec.http2.Http2MultiplexHandler; +import io.netty.handler.codec.http2.Http2StreamChannel; +import io.netty.util.CharsetUtil; +import org.apache.seata.core.rpc.netty.grpc.GrpcDecoder; +import org.apache.seata.core.rpc.netty.grpc.GrpcEncoder; + +public class Http2Detector implements ProtocolDetector { + private static final byte[] HTTP2_PREFIX_BYTES = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes(CharsetUtil.UTF_8); + private ChannelHandler[] serverHandlers; + + public Http2Detector(ChannelHandler[] serverHandlers) { + this.serverHandlers = serverHandlers; + } + + @Override + public boolean detect(ByteBuf in) { + if (in.readableBytes() < HTTP2_PREFIX_BYTES.length) { + return false; + } + for (int i = 0; i < HTTP2_PREFIX_BYTES.length; i++) { + if (in.getByte(i) != HTTP2_PREFIX_BYTES[i]) { + return false; + } + } + return true; + } + + @Override + public ChannelHandler[] getHandlers() { + return new ChannelHandler[]{ + Http2FrameCodecBuilder.forServer().build(), + new Http2MultiplexHandler(new ChannelInitializer() { + @Override + protected void initChannel(Http2StreamChannel ch) { + final ChannelPipeline p = ch.pipeline(); + p.addLast(new GrpcDecoder()); + p.addLast(new GrpcEncoder()); + p.addLast(serverHandlers); + } + }) + }; + } +} \ No newline at end of file diff --git a/core/src/main/java/org/apache/seata/core/protocol/detector/ProtocolDetector.java b/core/src/main/java/org/apache/seata/core/protocol/detector/ProtocolDetector.java new file mode 100644 index 00000000000..89d5d10e7be --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/protocol/detector/ProtocolDetector.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.protocol.detector; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; + +public interface ProtocolDetector { + boolean detect(ByteBuf in); + + ChannelHandler[] getHandlers(); +} diff --git a/core/src/main/java/org/apache/seata/core/protocol/detector/SeataDetector.java b/core/src/main/java/org/apache/seata/core/protocol/detector/SeataDetector.java new file mode 100644 index 00000000000..b9c30b0bc6d --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/protocol/detector/SeataDetector.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.protocol.detector; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; +import org.apache.seata.core.rpc.netty.MultiProtocolDecoder; + +public class SeataDetector implements ProtocolDetector { + private static final byte[] MAGIC_CODE_BYTES = {(byte) 0xda, (byte) 0xda}; + private ChannelHandler[] serverHandlers; + + public SeataDetector(ChannelHandler[] serverHandlers) { + this.serverHandlers = serverHandlers; + } + + @Override + public boolean detect(ByteBuf in) { + if (in.readableBytes() < MAGIC_CODE_BYTES.length) { + return false; + } + for (int i = 0; i < MAGIC_CODE_BYTES.length; i++) { + if (in.getByte(i) != MAGIC_CODE_BYTES[i]) { + return false; + } + } + return true; + } + + @Override + public ChannelHandler[] getHandlers() { + MultiProtocolDecoder multiProtocolDecoder = new MultiProtocolDecoder(serverHandlers); + + return new ChannelHandler[]{multiProtocolDecoder}; + } +} \ No newline at end of file diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java index 4aaafc0acb0..0fbd9ff0795 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java @@ -18,8 +18,11 @@ import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; @@ -28,13 +31,19 @@ import io.netty.channel.epoll.EpollMode; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http2.Http2FrameCodecBuilder; +import io.netty.handler.codec.http2.Http2MultiplexHandler; +import io.netty.handler.codec.http2.Http2StreamChannelBootstrap; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.internal.PlatformDependent; import org.apache.seata.common.exception.FrameworkException; import org.apache.seata.common.thread.NamedThreadFactory; +import org.apache.seata.core.protocol.Protocol; import org.apache.seata.core.rpc.RemotingBootstrap; +import org.apache.seata.core.rpc.netty.grpc.GrpcDecoder; +import org.apache.seata.core.rpc.netty.grpc.GrpcEncoder; import org.apache.seata.core.rpc.netty.v1.ProtocolDecoderV1; import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1; import org.slf4j.Logger; @@ -130,14 +139,18 @@ public void start() { @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); - pipeline - .addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(), - nettyClientConfig.getChannelMaxWriteIdleSeconds(), - nettyClientConfig.getChannelMaxAllIdleSeconds())) - .addLast(new ProtocolDecoderV1()) - .addLast(new ProtocolEncoderV1()); - if (channelHandlers != null) { - addChannelPipelineLast(ch, channelHandlers); + if (nettyClientConfig.getProtocol().equals(Protocol.GPRC.value)) { + pipeline.addLast(Http2FrameCodecBuilder.forClient().build()) + .addLast(new Http2MultiplexHandler(new ChannelDuplexHandler())); + } else { + pipeline.addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(), + nettyClientConfig.getChannelMaxWriteIdleSeconds(), + nettyClientConfig.getChannelMaxAllIdleSeconds())); + pipeline.addLast(new ProtocolDecoderV1()) + .addLast(new ProtocolEncoderV1()); + if (channelHandlers != null) { + addChannelPipelineLast(ch, channelHandlers); + } } } }); @@ -177,9 +190,30 @@ public Channel getNewChannel(InetSocketAddress address) { } else { channel = f.channel(); } + + if (nettyClientConfig.getProtocol().equals(Protocol.GPRC.value)) { + Http2StreamChannelBootstrap bootstrap = new Http2StreamChannelBootstrap(channel); + bootstrap.handler(new ChannelInboundHandlerAdapter() { + @Override + public void handlerAdded(ChannelHandlerContext ctx) { + Channel channel = ctx.channel(); + channel.pipeline().addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(), + nettyClientConfig.getChannelMaxWriteIdleSeconds(), + nettyClientConfig.getChannelMaxAllIdleSeconds())); + channel.pipeline().addLast(new GrpcDecoder()); + channel.pipeline().addLast(new GrpcEncoder()); + if (channelHandlers != null) { + addChannelPipelineLast(channel, channelHandlers); + } + } + }); + channel = bootstrap.open().get(); + } + } catch (Exception e) { throw new FrameworkException(e, "can not connect to services-server."); } + return channel; } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientConfig.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientConfig.java index f0e047ad58d..68583608262 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientConfig.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientConfig.java @@ -21,6 +21,7 @@ import org.apache.seata.core.rpc.TransportServerType; import static org.apache.seata.common.DefaultValues.DEFAULT_ENABLE_CLIENT_BATCH_SEND_REQUEST; +import static org.apache.seata.common.DefaultValues.DEFAULT_PROTOCOL; import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_RM_REQUEST_TIMEOUT; import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_TM_REQUEST_TIMEOUT; import static org.apache.seata.common.DefaultValues.DEFAULT_SELECTOR_THREAD_PREFIX; @@ -451,6 +452,10 @@ public String getRmDispatchThreadPrefix() { return RPC_DISPATCH_THREAD_PREFIX + "_" + NettyPoolKey.TransactionRole.RMROLE.name(); } + public String getProtocol() { + return CONFIG.getConfig(org.apache.seata.common.ConfigurationKeys.TRANSPORT_PROTOCOL, DEFAULT_PROTOCOL); + } + @Deprecated public static boolean isEnableClientBatchSendRequest() { return ENABLE_CLIENT_BATCH_SEND_REQUEST; diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java index c7b2aa57c21..b589396e5ab 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java @@ -92,6 +92,10 @@ protected void setChannelHandlers(final ChannelHandler... handlers) { } } + protected ChannelHandler[] getChannelHandlers() { + return channelHandlers; + } + /** * Add channel pipeline last. * @@ -158,10 +162,8 @@ public void start() { .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) { - MultiProtocolDecoder multiProtocolDecoder = new MultiProtocolDecoder(channelHandlers); - ch.pipeline() - .addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0)) - .addLast(multiProtocolDecoder); + ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0)) + .addLast(new ProtocolDetectHandler(NettyServerBootstrap.this)); } }); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDetectHandler.java b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDetectHandler.java new file mode 100644 index 00000000000..9f1b5f8c113 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDetectHandler.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import org.apache.seata.core.protocol.detector.Http2Detector; +import org.apache.seata.core.protocol.detector.ProtocolDetector; +import org.apache.seata.core.protocol.detector.SeataDetector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class ProtocolDetectHandler extends ByteToMessageDecoder { + private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolDetectHandler.class); + private NettyServerBootstrap nettyServerBootstrap; + private ProtocolDetector[] supportedProtocolDetectors; + + public ProtocolDetectHandler(NettyServerBootstrap nettyServerBootstrap) { + this.nettyServerBootstrap = nettyServerBootstrap; + this.supportedProtocolDetectors = new ProtocolDetector[]{new Http2Detector(nettyServerBootstrap.getChannelHandlers()), new SeataDetector(nettyServerBootstrap.getChannelHandlers())}; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + for (ProtocolDetector protocolDetector : supportedProtocolDetectors) { + if (protocolDetector.detect(in)) { + ChannelHandler[] protocolHandlers = protocolDetector.getHandlers(); + ctx.pipeline().addLast(protocolHandlers); + ctx.pipeline().remove(this); + + in.resetReaderIndex(); + return; + } + + in.resetReaderIndex(); + } + + byte[] preface = new byte[in.readableBytes()]; + in.readBytes(preface); + LOGGER.error("Can not recognize protocol from remote {}, preface = {}", ctx.channel().remoteAddress(), preface); + in.clear(); + ctx.close(); + } +} \ No newline at end of file diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java new file mode 100644 index 00000000000..71c9caf8be9 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.grpc; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http2.Http2DataFrame; +import io.netty.handler.codec.http2.Http2HeadersFrame; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.ReferenceCounted; +import org.apache.commons.lang.StringUtils; +import org.apache.seata.core.compressor.Compressor; +import org.apache.seata.core.compressor.CompressorFactory; +import org.apache.seata.core.protocol.HeartbeatMessage; +import org.apache.seata.core.protocol.ProtocolConstants; +import org.apache.seata.core.protocol.RpcMessage; +import org.apache.seata.core.protocol.generated.GrpcMessageProto; +import org.apache.seata.core.serializer.Serializer; +import org.apache.seata.core.serializer.SerializerServiceLoader; +import org.apache.seata.core.serializer.SerializerType; + +import java.util.Map; + +public class GrpcDecoder extends ChannelDuplexHandler { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof Http2HeadersFrame) { + onHeadersRead(ctx, (Http2HeadersFrame) msg); + } else if (msg instanceof Http2DataFrame) { + onDataRead(ctx, (Http2DataFrame) msg); + } else if (msg instanceof ReferenceCounted) { + ReferenceCountUtil.release(msg); + } + } + + public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg) throws Exception { + ByteBuf content = msg.content(); + try { + int readableBytes = content.readableBytes(); + byte[] bytes = new byte[readableBytes]; + content.readBytes(bytes); + if (bytes.length < 5) { + return; + } + + int srcPos = 0; + while (srcPos < readableBytes) { + // The first byte defaults to 0, indicating that no decompression is required + // Read the value of the next four bytes as the length of the body + int length = ((bytes[srcPos + 1] & 0xFF) << 24) | ((bytes[srcPos + 2] & 0xFF) << 16) + | ((bytes[srcPos + 3] & 0xFF) << 8) | (bytes[srcPos + 4] & 0xFF); + + byte[] data = new byte[length]; + System.arraycopy(bytes, srcPos + 5, data, 0, length); + GrpcMessageProto grpcMessageProto = GrpcMessageProto.parseFrom(data); + byte[] bodyBytes = grpcMessageProto.getBody().toByteArray(); + int messageType = grpcMessageProto.getMessageType(); + int messageId = grpcMessageProto.getId(); + Map headMap = grpcMessageProto.getHeadMapMap(); + + RpcMessage rpcMsg = new RpcMessage(); + if (messageType <= Byte.MAX_VALUE && messageType >= Byte.MIN_VALUE) { + rpcMsg.setMessageType((byte) messageType); + } + rpcMsg.setId(messageId); + rpcMsg.setHeadMap(grpcMessageProto.getHeadMapMap()); + + if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST) { + rpcMsg.setBody(HeartbeatMessage.PING); + } else if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) { + rpcMsg.setBody(HeartbeatMessage.PONG); + } else { + String compressType = headMap.get(GrpcHeaderEnum.COMPRESS_TYPE.header); + if (StringUtils.isNotBlank(compressType)) { + byte compress = Byte.parseByte(compressType); + rpcMsg.setCompressor(compress); + Compressor compressor = CompressorFactory.getCompressor(compress); + bodyBytes = compressor.decompress(bodyBytes); + } + String codecValue = headMap.get(GrpcHeaderEnum.CODEC_TYPE.header); + int codec = Integer.parseInt(codecValue); + SerializerType serializerType = SerializerType.getByCode(codec); + rpcMsg.setCodec(serializerType.getCode()); + Serializer serializer = SerializerServiceLoader.load(serializerType); + Object messageBody = serializer.deserialize(bodyBytes); + rpcMsg.setBody(messageBody); + } + + ctx.fireChannelRead(rpcMsg); + + srcPos += length + 5; + } + } finally { + ReferenceCountUtil.release(content); + } + } + + + public void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame headersFrame) throws Exception { + // TODO Subsequent decompression logic is possible + } +} diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java new file mode 100644 index 00000000000..dbbbfe1be48 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.grpc; + +import com.google.protobuf.ByteString; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http2.DefaultHttp2DataFrame; +import io.netty.handler.codec.http2.DefaultHttp2Headers; +import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame; +import io.netty.handler.codec.http2.Http2Headers; +import org.apache.seata.core.compressor.Compressor; +import org.apache.seata.core.compressor.CompressorFactory; +import org.apache.seata.core.protocol.ProtocolConstants; +import org.apache.seata.core.protocol.RpcMessage; +import org.apache.seata.core.protocol.generated.GrpcMessageProto; +import org.apache.seata.core.serializer.Serializer; +import org.apache.seata.core.serializer.SerializerServiceLoader; +import org.apache.seata.core.serializer.SerializerType; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class GrpcEncoder extends ChannelOutboundHandlerAdapter { + private final AtomicBoolean headerSent = new AtomicBoolean(false); + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (!(msg instanceof RpcMessage)) { + throw new UnsupportedOperationException("GrpcEncoder not support class:" + msg.getClass()); + } + + RpcMessage rpcMessage = (RpcMessage) msg; + byte messageType = rpcMessage.getMessageType(); + Map headMap = rpcMessage.getHeadMap(); + Object body = rpcMessage.getBody(); + int id = rpcMessage.getId(); + + if (headerSent.compareAndSet(false, true)) { + Http2Headers headers = new DefaultHttp2Headers(); + headers.add(GrpcHeaderEnum.HTTP2_STATUS.header, String.valueOf(200)); + headers.add(GrpcHeaderEnum.GRPC_STATUS.header, String.valueOf(0)); + headers.add(GrpcHeaderEnum.GRPC_CONTENT_TYPE.header, "application/grpc"); + ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers)); + } + + ByteString dataBytes; + if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST + && messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) { + Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(SerializerType.PROTOBUF.getCode())); + byte[] serializedBytes = serializer.serialize(body); + Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor()); + dataBytes = ByteString.copyFrom(compressor.compress(serializedBytes)); + } else { + dataBytes = ByteString.EMPTY; + } + headMap.put(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.PROTOBUF.getCode())); + headMap.put(GrpcHeaderEnum.COMPRESS_TYPE.header, String.valueOf(rpcMessage.getCompressor())); + GrpcMessageProto.Builder builder = GrpcMessageProto.newBuilder() + .putAllHeadMap(headMap) + .setMessageType(messageType) + .setId(id); + builder.setBody(ByteString.copyFrom(dataBytes.toByteArray())); + GrpcMessageProto grpcMessageProto = builder.build(); + + byte[] bodyBytes = grpcMessageProto.toByteArray(); + if (bodyBytes != null) { + byte[] messageWithPrefix = new byte[bodyBytes.length + 5]; + // The first byte is 0, indicating no compression + messageWithPrefix[0] = 0; + ByteBuffer buffer = ByteBuffer.allocate(4); + buffer.putInt(bodyBytes.length); + byte[] lengthBytes = buffer.array(); + // The last four bytes indicate the length + System.arraycopy(lengthBytes, 0, messageWithPrefix, 1, 4); + // The remaining bytes are body + System.arraycopy(bodyBytes, 0, messageWithPrefix, 5, bodyBytes.length); + ctx.writeAndFlush(new DefaultHttp2DataFrame(Unpooled.wrappedBuffer(messageWithPrefix))); + } + } + +} diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcHeaderEnum.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcHeaderEnum.java new file mode 100644 index 00000000000..ed8e729b47d --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcHeaderEnum.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.grpc; + +public enum GrpcHeaderEnum { + + /** + * grpc status + */ + GRPC_STATUS("grpc-status"), + /** + * http2 status + */ + HTTP2_STATUS(":status"), + /** + * content-type + */ + GRPC_CONTENT_TYPE("content-type"), + + /** + * codec-type + */ + CODEC_TYPE("codec-type"), + + /** + * compress-type + */ + COMPRESS_TYPE("compress-type"); + + public final String header; + + GrpcHeaderEnum(String header) { + this.header = header; + } +} diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java index dd01b948dba..39180f3bdc0 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java @@ -66,7 +66,6 @@ public class ProtocolEncoderV1 extends MessageToByteEncoder implements ProtocolE public void encode(RpcMessage message, ByteBuf out) { try { - ProtocolRpcMessageV1 rpcMessage = new ProtocolRpcMessageV1(); rpcMessage.rpcMsg2ProtocolMsg(message); diff --git a/core/src/main/java/org/apache/seata/core/serializer/SerializerServiceLoader.java b/core/src/main/java/org/apache/seata/core/serializer/SerializerServiceLoader.java index 63ce440edd1..0aa9bd340e8 100644 --- a/core/src/main/java/org/apache/seata/core/serializer/SerializerServiceLoader.java +++ b/core/src/main/java/org/apache/seata/core/serializer/SerializerServiceLoader.java @@ -72,8 +72,7 @@ public static Serializer load(SerializerType type, byte version) throws Enhanced "Please manually reference 'org.apache.seata:seata-serializer-protobuf' dependency."); } - - String key = serialzerKey(type, version); + String key = serializerKey(type, version); Serializer serializer = SERIALIZER_MAP.get(key); if (serializer == null) { if (type == SerializerType.SEATA) { @@ -86,7 +85,30 @@ public static Serializer load(SerializerType type, byte version) throws Enhanced return serializer; } - private static String serialzerKey(SerializerType type, byte version) { + /** + * Load the service of {@link Serializer} + * + * @param type the serializer type + * @return the service of {@link Serializer} + * @throws EnhancedServiceNotFoundException the enhanced service not found exception + */ + public static Serializer load(SerializerType type) throws EnhancedServiceNotFoundException { + if (type == SerializerType.PROTOBUF && !CONTAINS_PROTOBUF_DEPENDENCY) { + throw new EnhancedServiceNotFoundException("The class '" + PROTOBUF_SERIALIZER_CLASS_NAME + "' not found. " + + "Please manually reference 'org.apache.seata:seata-serializer-protobuf' dependency."); + } + + String key = type.name(); + Serializer serializer = SERIALIZER_MAP.get(key); + if (serializer == null) { + serializer = EnhancedServiceLoader.load(Serializer.class, type.name()); + + SERIALIZER_MAP.put(key, serializer); + } + return serializer; + } + + private static String serializerKey(SerializerType type, byte version) { if (type == SerializerType.SEATA) { return type.name() + version; } diff --git a/core/src/main/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto b/core/src/main/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto new file mode 100644 index 00000000000..dd61bd95f48 --- /dev/null +++ b/core/src/main/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto3"; +package org.apache.seata.protocol.protobuf; +option java_multiple_files = true; +option java_outer_classname = "GrpcMessage"; +option java_package = "org.apache.seata.core.protocol.generated"; + +message GrpcMessageProto { + int32 id = 1; + int32 messageType = 2; + map headMap = 3; + bytes body = 4; +} + +service SeataService { + rpc sendRequest (stream GrpcMessageProto) returns (stream GrpcMessageProto); +} \ No newline at end of file diff --git a/dependencies/pom.xml b/dependencies/pom.xml index 4bf0450d4f3..15178f14166 100644 --- a/dependencies/pom.xml +++ b/dependencies/pom.xml @@ -76,7 +76,6 @@ 4.1.101.Final 4.0.3 1.6.7 - 3.25.4 1.66.0 5.4.0 @@ -623,6 +622,11 @@ grpc-core ${grpc.version} + + io.grpc + grpc-alts + ${grpc.version} + io.grpc grpc-api diff --git a/script/client/conf/file.conf b/script/client/conf/file.conf index 92c836e17a6..4b121660922 100644 --- a/script/client/conf/file.conf +++ b/script/client/conf/file.conf @@ -16,6 +16,8 @@ # transport { + # communication protocols, seata or grpc, default seata + protocol = "seata" # tcp, unix-domain-socket type = "TCP" #NIO, NATIVE diff --git a/script/client/spring/application.properties b/script/client/spring/application.properties index cb7d93c2ab7..2a72d1e5f79 100755 --- a/script/client/spring/application.properties +++ b/script/client/spring/application.properties @@ -64,6 +64,7 @@ seata.log.exception-rate=100 seata.service.vgroup-mapping.default_tx_group=default seata.service.grouplist.default=127.0.0.1:8091 seata.service.disable-global-transaction=false +seata.transport.protocol=seata seata.transport.shutdown.wait=3 seata.transport.thread-factory.boss-thread-prefix=NettyBoss seata.transport.thread-factory.worker-thread-prefix=NettyServerNIOWorker diff --git a/script/client/spring/application.yml b/script/client/spring/application.yml index 9eef693d332..a6100f05740 100755 --- a/script/client/spring/application.yml +++ b/script/client/spring/application.yml @@ -73,6 +73,7 @@ seata: default: 127.0.0.1:8091 disable-global-transaction: false transport: + protocol: seata shutdown: wait: 3 thread-factory: diff --git a/script/config-center/config.txt b/script/config-center/config.txt index 99cd7bd1313..8cf986f3f94 100644 --- a/script/config-center/config.txt +++ b/script/config-center/config.txt @@ -17,6 +17,7 @@ #For details about configuration items, see https://seata.apache.org/zh-cn/docs/user/configurations #Transport configuration, for client and server +transport.protocol=seata transport.type=TCP transport.server=NIO transport.heartbeat=true diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/TransportProperties.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/TransportProperties.java index 8b4caa4e8f6..64f38a385d0 100644 --- a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/TransportProperties.java +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/TransportProperties.java @@ -23,6 +23,7 @@ import static org.apache.seata.common.DefaultValues.DEFAULT_ENABLE_RM_CLIENT_BATCH_SEND_REQUEST; import static org.apache.seata.common.DefaultValues.DEFAULT_ENABLE_TC_SERVER_BATCH_SEND_RESPONSE; import static org.apache.seata.common.DefaultValues.DEFAULT_ENABLE_TM_CLIENT_BATCH_SEND_REQUEST; +import static org.apache.seata.common.DefaultValues.DEFAULT_PROTOCOL; import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_RM_REQUEST_TIMEOUT; import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_TC_REQUEST_TIMEOUT; import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_TM_REQUEST_TIMEOUT; @@ -54,6 +55,8 @@ public class TransportProperties { */ private String compressor = "none"; + private String protocol = DEFAULT_PROTOCOL; + /** * enable client batch send request */ @@ -193,4 +196,12 @@ public long getRpcTcRequestTimeout() { public void setRpcTcRequestTimeout(long rpcTcRequestTimeout) { this.rpcTcRequestTimeout = rpcTcRequestTimeout; } + + public String getProtocol() { + return protocol; + } + + public void setProtocol(String protocol) { + this.protocol = protocol; + } } diff --git a/test/pom.xml b/test/pom.xml index 91d4c9f60fb..d35f25bad5e 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -40,6 +40,28 @@ true + + org.xolstice.maven.plugins + protobuf-maven-plugin + + ${project.basedir}/src/test/resources/protobuf/org/apache/seata/protocol/transcation/ + + com.google.protobuf:protoc:3.25.4:exe:${os.detected.classifier} + + grpc-java + + io.grpc:protoc-gen-grpc-java:1.66.0:exe:${os.detected.classifier} + + + + + + compile + compile-custom + + + + @@ -49,6 +71,10 @@ seata-tm ${project.version} + + io.grpc + grpc-alts + diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java new file mode 100644 index 00000000000..0d63d2eb70f --- /dev/null +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.mockserver; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; +import org.apache.seata.common.ConfigurationKeys; +import org.apache.seata.common.ConfigurationTestHelper; +import org.apache.seata.config.ConfigurationFactory; +import org.apache.seata.core.protocol.generated.GrpcMessageProto; +import org.apache.seata.core.rpc.netty.RmNettyRemotingClient; +import org.apache.seata.core.rpc.netty.TmNettyRemotingClient; +import org.apache.seata.mockserver.MockServer; +import org.apache.seata.serializer.protobuf.generated.*; +import org.apache.seata.core.protocol.generated.SeataServiceGrpc; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class GrpcTest { + + private static ManagedChannel channel; + + private static SeataServiceGrpc.SeataServiceStub seataServiceStub; + + @BeforeAll + public static void before() { + ConfigurationFactory.reload(); + ConfigurationTestHelper.putConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL, String.valueOf(ProtocolTestConstants.MOCK_SERVER_PORT)); + MockServer.start(ProtocolTestConstants.MOCK_SERVER_PORT); + TmNettyRemotingClient.getInstance().destroy(); + RmNettyRemotingClient.getInstance().destroy(); + + channel = ManagedChannelBuilder.forAddress("127.0.0.1", ProtocolTestConstants.MOCK_SERVER_PORT).usePlaintext().build(); + seataServiceStub = SeataServiceGrpc.newStub(channel); + } + + @AfterAll + public static void after() { + //MockServer.close(); + ConfigurationTestHelper.removeConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL); + TmNettyRemotingClient.getInstance().destroy(); + RmNettyRemotingClient.getInstance().destroy(); + } + + private GrpcMessageProto getRegisterTMRequest() { + AbstractIdentifyRequestProto abstractIdentifyRequestProto = AbstractIdentifyRequestProto.newBuilder() + .setApplicationId("test-applicationId") + .build(); + RegisterTMRequestProto registerTMRequestProto = RegisterTMRequestProto.newBuilder() + .setAbstractIdentifyRequest(abstractIdentifyRequestProto) + .build(); + + return GrpcMessageProto.newBuilder().setBody(registerTMRequestProto.toByteString()).build(); + } + + private GrpcMessageProto getGlobalBeginRequest() { + GlobalBeginRequestProto globalBeginRequestProto = GlobalBeginRequestProto.newBuilder() + .setTransactionName("test-transaction") + .setTimeout(2000) + .build(); + return GrpcMessageProto.newBuilder().setBody(globalBeginRequestProto.toByteString()).build(); + } + + private GrpcMessageProto getBranchRegisterRequest() { + BranchRegisterRequestProto branchRegisterRequestProto = BranchRegisterRequestProto.newBuilder() + .setXid("1") + .setLockKey("1") + .setResourceId("test-resource") + .setBranchType(BranchTypeProto.TCC) + .setApplicationData("{\"mock\":\"mock\"}") + .build(); + + return GrpcMessageProto.newBuilder().setBody(branchRegisterRequestProto.toByteString()).build(); + } + + private GrpcMessageProto getGlobalCommitRequest() { + AbstractGlobalEndRequestProto globalEndRequestProto = AbstractGlobalEndRequestProto.newBuilder() + .setXid("1") + .build(); + GlobalCommitRequestProto globalCommitRequestProto = GlobalCommitRequestProto.newBuilder() + .setAbstractGlobalEndRequest(globalEndRequestProto) + .build(); + + return GrpcMessageProto.newBuilder().setBody(globalCommitRequestProto.toByteString()).build(); + } + + private GrpcMessageProto getGlobalRollbackRequest() { + AbstractGlobalEndRequestProto globalEndRequestProto = AbstractGlobalEndRequestProto.newBuilder() + .setXid("1") + .build(); + GlobalRollbackRequestProto globalRollbackRequestProto = GlobalRollbackRequestProto.newBuilder() + .setAbstractGlobalEndRequest(globalEndRequestProto) + .build(); + + return GrpcMessageProto.newBuilder().setBody(globalRollbackRequestProto.toByteString()).build(); + } + + @Test + public void testCommit() throws InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(4); + StreamObserver streamObserver = new StreamObserver() { + @Override + public void onNext(GrpcMessageProto grpcMessageProto) { + System.out.println("receive : " + grpcMessageProto.toString()); + countDownLatch.countDown(); + } + + @Override + public void onError(Throwable throwable) { + throwable.printStackTrace(); + } + + @Override + public void onCompleted() { + + } + }; + + StreamObserver response = seataServiceStub.sendRequest(streamObserver); + response.onNext(getRegisterTMRequest()); + response.onNext(getGlobalBeginRequest()); + response.onNext(getBranchRegisterRequest()); + response.onNext(getGlobalCommitRequest()); + + response.onCompleted(); + + countDownLatch.await(10, TimeUnit.SECONDS); + } + + @Test + public void testRollback() throws InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(4); + StreamObserver streamObserver = new StreamObserver() { + @Override + public void onNext(GrpcMessageProto grpcMessageProto) { + System.out.println("receive : " + grpcMessageProto.toString()); + countDownLatch.countDown(); + } + + @Override + public void onError(Throwable throwable) { + throwable.printStackTrace(); + } + + @Override + public void onCompleted() { + + } + }; + + StreamObserver response = seataServiceStub.sendRequest(streamObserver); + response.onNext(getRegisterTMRequest()); + response.onNext(getGlobalBeginRequest()); + response.onNext(getBranchRegisterRequest()); + response.onNext(getGlobalRollbackRequest()); + + response.onCompleted(); + + countDownLatch.await(10, TimeUnit.SECONDS); + } +} diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/MockGrpcServerTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/MockGrpcServerTest.java new file mode 100644 index 00000000000..3744ddd270f --- /dev/null +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/MockGrpcServerTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.mockserver; + +import org.apache.seata.common.ConfigurationKeys; +import org.apache.seata.common.ConfigurationTestHelper; +import org.apache.seata.config.ConfigurationFactory; +import org.apache.seata.core.exception.TransactionException; +import org.apache.seata.core.model.BranchType; +import org.apache.seata.core.model.GlobalStatus; +import org.apache.seata.core.model.TransactionManager; +import org.apache.seata.core.protocol.Protocol; +import org.apache.seata.core.rpc.netty.RmNettyRemotingClient; +import org.apache.seata.core.rpc.netty.TmNettyRemotingClient; +import org.apache.seata.mockserver.MockCoordinator; +import org.apache.seata.mockserver.MockServer; +import org.apache.seata.rm.DefaultResourceManager; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * the type MockServerTest + */ +public class MockGrpcServerTest { + + static String RESOURCE_ID = "mock-action"; + + Logger logger = LoggerFactory.getLogger(MockGrpcServerTest.class); + + @BeforeAll + public static void before() { + ConfigurationFactory.reload(); + ConfigurationTestHelper.putConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL, String.valueOf(ProtocolTestConstants.MOCK_SERVER_PORT)); + ConfigurationTestHelper.putConfig(ConfigurationKeys.TRANSPORT_PROTOCOL, Protocol.GPRC.value); + MockServer.start(ProtocolTestConstants.MOCK_SERVER_PORT); + TmNettyRemotingClient.getInstance().destroy(); + RmNettyRemotingClient.getInstance().destroy(); + } + + @AfterAll + public static void after() { + //MockServer.close(); + ConfigurationTestHelper.removeConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL); + ConfigurationTestHelper.removeConfig(ConfigurationKeys.TRANSPORT_PROTOCOL); + TmNettyRemotingClient.getInstance().destroy(); + RmNettyRemotingClient.getInstance().destroy(); + } + + @Test + public void testCommit() throws TransactionException { + String xid = doTestCommit(0); + Assertions.assertEquals(1, Action1Impl.getCommitTimes(xid)); + Assertions.assertEquals(0, Action1Impl.getRollbackTimes(xid)); + } + + @Test + public void testCommitRetry() throws TransactionException { + String xid = doTestCommit(2); + Assertions.assertEquals(3, Action1Impl.getCommitTimes(xid)); + Assertions.assertEquals(0, Action1Impl.getRollbackTimes(xid)); + } + + @Test + public void testRollback() throws TransactionException { + String xid = doTestRollback(0); + Assertions.assertEquals(0, Action1Impl.getCommitTimes(xid)); + Assertions.assertEquals(1, Action1Impl.getRollbackTimes(xid)); + } + + @Test + public void testRollbackRetry() throws TransactionException { + String xid = doTestRollback(2); + Assertions.assertEquals(0, Action1Impl.getCommitTimes(xid)); + Assertions.assertEquals(3, Action1Impl.getRollbackTimes(xid)); + } + + private String doTestCommit(int times) throws TransactionException { + TransactionManager tm = TmClientTest.getTm(); + DefaultResourceManager rm = RmClientTest.getRm(RESOURCE_ID); + + String xid = tm.begin(ProtocolTestConstants.APPLICATION_ID, ProtocolTestConstants.SERVICE_GROUP, "test-commit", 60000); + MockCoordinator.getInstance().setExpectedRetry(xid, times); + Long branchId = rm.branchRegister(BranchType.TCC, RESOURCE_ID, "1", xid, "{\"mock\":\"mock\"}", "1"); + GlobalStatus commit = tm.commit(xid); + Assertions.assertEquals(GlobalStatus.Committed, commit); + return xid; + } + + private String doTestRollback(int times) throws TransactionException { + TransactionManager tm = TmClientTest.getTm(); + DefaultResourceManager rm = RmClientTest.getRm(RESOURCE_ID); + + String xid = tm.begin(ProtocolTestConstants.APPLICATION_ID, ProtocolTestConstants.SERVICE_GROUP, "test-rollback", 60000); + logger.info("doTestRollback xid:{}", xid); + MockCoordinator.getInstance().setExpectedRetry(xid, times); + Long branchId = rm.branchRegister(BranchType.TCC, RESOURCE_ID, "1", xid, "{\"mock\":\"mock\"}", "1"); + GlobalStatus rollback = tm.rollback(xid); + Assertions.assertEquals(GlobalStatus.Rollbacked, rollback); + return xid; + + } +} diff --git a/test/src/test/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto b/test/src/test/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto new file mode 100644 index 00000000000..dd61bd95f48 --- /dev/null +++ b/test/src/test/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto3"; +package org.apache.seata.protocol.protobuf; +option java_multiple_files = true; +option java_outer_classname = "GrpcMessage"; +option java_package = "org.apache.seata.core.protocol.generated"; + +message GrpcMessageProto { + int32 id = 1; + int32 messageType = 2; + map headMap = 3; + bytes body = 4; +} + +service SeataService { + rpc sendRequest (stream GrpcMessageProto) returns (stream GrpcMessageProto); +} \ No newline at end of file