From 41733d039353212d00859aa4b47c410c00a11200 Mon Sep 17 00:00:00 2001 From: yiqi <77573225+PleaseGiveMeTheCoke@users.noreply.github.com> Date: Thu, 26 Sep 2024 22:20:18 +0800 Subject: [PATCH 01/21] Grpc support (#3) * grpc protocol * grpc protocol * grpc protocol * grpc protocol * grpc protocol * delete no need change * cr * 1. mock test 2. support multi-version * cr * cr * Delete seata-test-grpc/src/main/java/org/apache/seata/grpc/generated/GrpcMessageProto.java * Delete seata-test-grpc/src/main/java/org/apache/seata/grpc/generated/GrpcMessage.java * Delete seata-test-grpc/src/main/java/org/apache/seata/grpc/generated/SeataServiceGrpc.java * Delete serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/GrpcSerializer.java * Delete seata-test-grpc/src/main/java/org/apache/seata/grpc/generated/GrpcMessageProtoOrBuilder.java * cr * cr * cr * cr * cr * cr * cr * cr * cr * cr * cr * cr * cr * cr * cr * cr * optimize: update grpc and protocol version * Before casting, check if the value is within the range of the target type. * Before casting, check if the value is within the range of the target type. * Remove duplicate code * import grpc-core * import grpc-core * bugfix * bugfix * bugfix: context release * bugfix: release byteBuf * opt --------- Co-authored-by: yiqi <1455432762@qq.com> --- .../seata/common/ConfigurationKeys.java | 2 + .../apache/seata/common/DefaultValues.java | 1 + core/pom.xml | 21 ++ .../apache/seata/core/protocol/Protocol.java | 39 ++++ .../core/protocol/detector/Http2Detector.java | 66 +++++++ .../protocol/detector/ProtocolDetector.java | 26 +++ .../core/protocol/detector/SeataDetector.java | 50 +++++ .../core/rpc/netty/NettyClientBootstrap.java | 45 ++++- .../core/rpc/netty/NettyClientConfig.java | 5 + .../core/rpc/netty/NettyServerBootstrap.java | 10 +- .../core/rpc/netty/ProtocolDetectHandler.java | 62 ++++++ .../core/rpc/netty/grpc/GrpcDecoder.java | 104 ++++++++++ .../core/rpc/netty/grpc/GrpcEncoder.java | 86 +++++++++ .../core/rpc/netty/grpc/GrpcHeaderEnum.java | 39 ++++ .../serializer/SerializerServiceLoader.java | 28 ++- .../seata/core/serializer/SerializerType.java | 7 + .../protocol/transcation/grpcMessage.proto | 33 ++++ dependencies/pom.xml | 6 +- .../properties/TransportProperties.java | 11 ++ .../serializer/protobuf/GrpcSerializer.java | 60 ++++++ ...rg.apache.seata.core.serializer.Serializer | 3 +- test/pom.xml | 26 +++ .../core/rpc/netty/mockserver/GrpcTest.java | 181 ++++++++++++++++++ .../protocol/transcation/grpcMessage.proto | 33 ++++ 24 files changed, 930 insertions(+), 14 deletions(-) create mode 100644 core/src/main/java/org/apache/seata/core/protocol/Protocol.java create mode 100644 core/src/main/java/org/apache/seata/core/protocol/detector/Http2Detector.java create mode 100644 core/src/main/java/org/apache/seata/core/protocol/detector/ProtocolDetector.java create mode 100644 core/src/main/java/org/apache/seata/core/protocol/detector/SeataDetector.java create mode 100644 core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDetectHandler.java create mode 100644 core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java create mode 100644 core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java create mode 100644 core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcHeaderEnum.java create mode 100644 core/src/main/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto create mode 100644 serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/GrpcSerializer.java create mode 100644 test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java create mode 100644 test/src/test/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto diff --git a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java index dec76f51b7c..ff8436b6dc9 100644 --- a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java +++ b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java @@ -628,6 +628,8 @@ public interface ConfigurationKeys { @Deprecated String ENABLE_CLIENT_BATCH_SEND_REQUEST = TRANSPORT_PREFIX + "enableClientBatchSendRequest"; + String TRANSPORT_PROTOCOL = TRANSPORT_PREFIX + "protocol"; + /** * The constant ENABLE_TM_CLIENT_BATCH_SEND_REQUEST */ diff --git a/common/src/main/java/org/apache/seata/common/DefaultValues.java b/common/src/main/java/org/apache/seata/common/DefaultValues.java index 8c484a1ab0f..eb0d40bb308 100644 --- a/common/src/main/java/org/apache/seata/common/DefaultValues.java +++ b/common/src/main/java/org/apache/seata/common/DefaultValues.java @@ -63,6 +63,7 @@ public interface DefaultValues { String DEFAULT_BOSS_THREAD_PREFIX = "NettyBoss"; String DEFAULT_NIO_WORKER_THREAD_PREFIX = "NettyServerNIOWorker"; String DEFAULT_EXECUTOR_THREAD_PREFIX = "NettyServerBizHandler"; + String DEFAULT_PROTOCOL = "seata"; boolean DEFAULT_TRANSPORT_HEARTBEAT = true; boolean DEFAULT_TRANSACTION_UNDO_DATA_VALIDATION = true; diff --git a/core/pom.xml b/core/pom.xml index 9de6107bc03..26ce3dc9018 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -69,6 +69,10 @@ fastjson test + + com.google.protobuf + protobuf-java + @@ -90,6 +94,23 @@ + + org.xolstice.maven.plugins + protobuf-maven-plugin + + ${project.basedir}/src/main/resources/protobuf/org/apache/seata/protocol/transcation/ + + com.google.protobuf:protoc:3.25.4:exe:${os.detected.classifier} + + + + + + compile + + + + diff --git a/core/src/main/java/org/apache/seata/core/protocol/Protocol.java b/core/src/main/java/org/apache/seata/core/protocol/Protocol.java new file mode 100644 index 00000000000..fe3cc000cfc --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/protocol/Protocol.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.protocol; + +/** + * seata transport protocol + */ +public enum Protocol { + + /** + * grpc + */ + GPRC("grpc"), + + /** + * seata + */ + SEATA("seata"); + + public final String value; + + Protocol(String value) { + this.value = value; + } +} diff --git a/core/src/main/java/org/apache/seata/core/protocol/detector/Http2Detector.java b/core/src/main/java/org/apache/seata/core/protocol/detector/Http2Detector.java new file mode 100644 index 00000000000..a004894f9b1 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/protocol/detector/Http2Detector.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.protocol.detector; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.http2.Http2FrameCodecBuilder; +import io.netty.handler.codec.http2.Http2MultiplexHandler; +import io.netty.handler.codec.http2.Http2StreamChannel; +import io.netty.util.CharsetUtil; +import org.apache.seata.core.rpc.netty.grpc.GrpcDecoder; +import org.apache.seata.core.rpc.netty.grpc.GrpcEncoder; + +public class Http2Detector implements ProtocolDetector { + private static final byte[] HTTP2_PREFIX_BYTES = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes(CharsetUtil.UTF_8); + private ChannelHandler[] serverHandlers; + + public Http2Detector(ChannelHandler[] serverHandlers) { + this.serverHandlers = serverHandlers; + } + + @Override + public boolean detect(ByteBuf in) { + if (in.readableBytes() < HTTP2_PREFIX_BYTES.length) { + return false; + } + for (int i = 0; i < HTTP2_PREFIX_BYTES.length; i++) { + if (in.getByte(i) != HTTP2_PREFIX_BYTES[i]) { + return false; + } + } + return true; + } + + @Override + public ChannelHandler[] getHandlers() { + return new ChannelHandler[]{ + Http2FrameCodecBuilder.forServer().build(), + new Http2MultiplexHandler(new ChannelInitializer() { + @Override + protected void initChannel(Http2StreamChannel ch) { + final ChannelPipeline p = ch.pipeline(); + p.addLast(new GrpcDecoder()); + p.addLast(new GrpcEncoder()); + p.addLast(serverHandlers); + } + }) + }; + } +} \ No newline at end of file diff --git a/core/src/main/java/org/apache/seata/core/protocol/detector/ProtocolDetector.java b/core/src/main/java/org/apache/seata/core/protocol/detector/ProtocolDetector.java new file mode 100644 index 00000000000..89d5d10e7be --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/protocol/detector/ProtocolDetector.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.protocol.detector; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; + +public interface ProtocolDetector { + boolean detect(ByteBuf in); + + ChannelHandler[] getHandlers(); +} diff --git a/core/src/main/java/org/apache/seata/core/protocol/detector/SeataDetector.java b/core/src/main/java/org/apache/seata/core/protocol/detector/SeataDetector.java new file mode 100644 index 00000000000..b9c30b0bc6d --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/protocol/detector/SeataDetector.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.protocol.detector; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; +import org.apache.seata.core.rpc.netty.MultiProtocolDecoder; + +public class SeataDetector implements ProtocolDetector { + private static final byte[] MAGIC_CODE_BYTES = {(byte) 0xda, (byte) 0xda}; + private ChannelHandler[] serverHandlers; + + public SeataDetector(ChannelHandler[] serverHandlers) { + this.serverHandlers = serverHandlers; + } + + @Override + public boolean detect(ByteBuf in) { + if (in.readableBytes() < MAGIC_CODE_BYTES.length) { + return false; + } + for (int i = 0; i < MAGIC_CODE_BYTES.length; i++) { + if (in.getByte(i) != MAGIC_CODE_BYTES[i]) { + return false; + } + } + return true; + } + + @Override + public ChannelHandler[] getHandlers() { + MultiProtocolDecoder multiProtocolDecoder = new MultiProtocolDecoder(serverHandlers); + + return new ChannelHandler[]{multiProtocolDecoder}; + } +} \ No newline at end of file diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java index 4aaafc0acb0..68ee02d71c0 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java @@ -18,8 +18,11 @@ import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; @@ -28,13 +31,19 @@ import io.netty.channel.epoll.EpollMode; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http2.Http2FrameCodecBuilder; +import io.netty.handler.codec.http2.Http2MultiplexHandler; +import io.netty.handler.codec.http2.Http2StreamChannelBootstrap; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.internal.PlatformDependent; import org.apache.seata.common.exception.FrameworkException; import org.apache.seata.common.thread.NamedThreadFactory; +import org.apache.seata.core.protocol.Protocol; import org.apache.seata.core.rpc.RemotingBootstrap; +import org.apache.seata.core.rpc.netty.grpc.GrpcDecoder; +import org.apache.seata.core.rpc.netty.grpc.GrpcEncoder; import org.apache.seata.core.rpc.netty.v1.ProtocolDecoderV1; import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1; import org.slf4j.Logger; @@ -130,12 +139,19 @@ public void start() { @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); - pipeline - .addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(), + pipeline.addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(), nettyClientConfig.getChannelMaxWriteIdleSeconds(), - nettyClientConfig.getChannelMaxAllIdleSeconds())) - .addLast(new ProtocolDecoderV1()) - .addLast(new ProtocolEncoderV1()); + nettyClientConfig.getChannelMaxAllIdleSeconds())); + if (nettyClientConfig.getProtocol().equals(Protocol.GPRC.value)) { + pipeline.addLast(Http2FrameCodecBuilder.forClient().build()) + .addLast(new Http2MultiplexHandler(new ChannelDuplexHandler())) + .addLast(new GrpcDecoder()) + .addLast(new GrpcEncoder()); + } else { + pipeline.addLast(new ProtocolDecoderV1()) + .addLast(new ProtocolEncoderV1()); + } + if (channelHandlers != null) { addChannelPipelineLast(ch, channelHandlers); } @@ -177,9 +193,28 @@ public Channel getNewChannel(InetSocketAddress address) { } else { channel = f.channel(); } + + // TODO tmp only for grpc + if (nettyClientConfig.getProtocol().equals(Protocol.GPRC.value)) { + Http2StreamChannelBootstrap bootstrap = new Http2StreamChannelBootstrap(channel); + bootstrap.handler(new ChannelInboundHandlerAdapter() { + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + channel.pipeline().addLast(new GrpcDecoder()); + channel.pipeline().addLast(new GrpcEncoder()); + if (channelHandlers != null) { + addChannelPipelineLast(channel, channelHandlers); + } + } + }); + channel = bootstrap.open().get(); + } + } catch (Exception e) { throw new FrameworkException(e, "can not connect to services-server."); } + return channel; } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientConfig.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientConfig.java index f0e047ad58d..68583608262 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientConfig.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientConfig.java @@ -21,6 +21,7 @@ import org.apache.seata.core.rpc.TransportServerType; import static org.apache.seata.common.DefaultValues.DEFAULT_ENABLE_CLIENT_BATCH_SEND_REQUEST; +import static org.apache.seata.common.DefaultValues.DEFAULT_PROTOCOL; import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_RM_REQUEST_TIMEOUT; import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_TM_REQUEST_TIMEOUT; import static org.apache.seata.common.DefaultValues.DEFAULT_SELECTOR_THREAD_PREFIX; @@ -451,6 +452,10 @@ public String getRmDispatchThreadPrefix() { return RPC_DISPATCH_THREAD_PREFIX + "_" + NettyPoolKey.TransactionRole.RMROLE.name(); } + public String getProtocol() { + return CONFIG.getConfig(org.apache.seata.common.ConfigurationKeys.TRANSPORT_PROTOCOL, DEFAULT_PROTOCOL); + } + @Deprecated public static boolean isEnableClientBatchSendRequest() { return ENABLE_CLIENT_BATCH_SEND_REQUEST; diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java index c7b2aa57c21..b589396e5ab 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java @@ -92,6 +92,10 @@ protected void setChannelHandlers(final ChannelHandler... handlers) { } } + protected ChannelHandler[] getChannelHandlers() { + return channelHandlers; + } + /** * Add channel pipeline last. * @@ -158,10 +162,8 @@ public void start() { .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) { - MultiProtocolDecoder multiProtocolDecoder = new MultiProtocolDecoder(channelHandlers); - ch.pipeline() - .addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0)) - .addLast(multiProtocolDecoder); + ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0)) + .addLast(new ProtocolDetectHandler(NettyServerBootstrap.this)); } }); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDetectHandler.java b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDetectHandler.java new file mode 100644 index 00000000000..9f1b5f8c113 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDetectHandler.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import org.apache.seata.core.protocol.detector.Http2Detector; +import org.apache.seata.core.protocol.detector.ProtocolDetector; +import org.apache.seata.core.protocol.detector.SeataDetector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class ProtocolDetectHandler extends ByteToMessageDecoder { + private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolDetectHandler.class); + private NettyServerBootstrap nettyServerBootstrap; + private ProtocolDetector[] supportedProtocolDetectors; + + public ProtocolDetectHandler(NettyServerBootstrap nettyServerBootstrap) { + this.nettyServerBootstrap = nettyServerBootstrap; + this.supportedProtocolDetectors = new ProtocolDetector[]{new Http2Detector(nettyServerBootstrap.getChannelHandlers()), new SeataDetector(nettyServerBootstrap.getChannelHandlers())}; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + for (ProtocolDetector protocolDetector : supportedProtocolDetectors) { + if (protocolDetector.detect(in)) { + ChannelHandler[] protocolHandlers = protocolDetector.getHandlers(); + ctx.pipeline().addLast(protocolHandlers); + ctx.pipeline().remove(this); + + in.resetReaderIndex(); + return; + } + + in.resetReaderIndex(); + } + + byte[] preface = new byte[in.readableBytes()]; + in.readBytes(preface); + LOGGER.error("Can not recognize protocol from remote {}, preface = {}", ctx.channel().remoteAddress(), preface); + in.clear(); + ctx.close(); + } +} \ No newline at end of file diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java new file mode 100644 index 00000000000..bcde5702eb3 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.grpc; + +import com.google.protobuf.Any; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http2.Http2DataFrame; +import io.netty.handler.codec.http2.Http2HeadersFrame; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.ReferenceCounted; +import org.apache.seata.core.protocol.RpcMessage; +import org.apache.seata.core.protocol.generated.GrpcMessageProto; +import org.apache.seata.core.serializer.Serializer; +import org.apache.seata.core.serializer.SerializerServiceLoader; +import org.apache.seata.core.serializer.SerializerType; + +public class GrpcDecoder extends ChannelDuplexHandler { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof Http2HeadersFrame) { + onHeadersRead(ctx, (Http2HeadersFrame) msg); + } else if (msg instanceof Http2DataFrame) { + onDataRead(ctx, (Http2DataFrame) msg); + } else if (msg instanceof ReferenceCounted) { + ReferenceCountUtil.release(msg); + } + } + + public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg) throws Exception { + ByteBuf content = msg.content(); + try { + int readableBytes = content.readableBytes(); + byte[] bytes = new byte[readableBytes]; + content.readBytes(bytes); + if (bytes.length < 5) { + return; + } + + int srcPos = 0; + while (srcPos < readableBytes) { + // The first byte defaults to 0, indicating that no decompression is required + // Read the value of the next four bytes as the length of the body + int length = ((bytes[srcPos + 1] & 0xFF) << 24) | ((bytes[srcPos + 2] & 0xFF) << 16) + | ((bytes[srcPos + 3] & 0xFF) << 8) | (bytes[srcPos + 4] & 0xFF); + + byte[] data = new byte[length]; + System.arraycopy(bytes, srcPos + 5, data, 0, length); + + GrpcMessageProto grpcMessageProto = GrpcMessageProto.parseFrom(data); + Any body = grpcMessageProto.getBody(); + int messageType = safeCastToInt(grpcMessageProto.getMessageType()); + int messageId = safeCastToInt(grpcMessageProto.getId()); + byte[] byteArray = body.toByteArray(); + + Serializer serializer = + SerializerServiceLoader.load(SerializerType.getByCode(SerializerType.GRPC.getCode()), (byte)0); + Object messageBody = serializer.deserialize(byteArray); + + RpcMessage rpcMsg = new RpcMessage(); + rpcMsg.setMessageType((byte)messageType); + rpcMsg.setBody(messageBody); + rpcMsg.setId(messageId); + rpcMsg.setHeadMap(grpcMessageProto.getHeadMapMap()); + + ctx.fireChannelRead(rpcMsg); + + srcPos += length + 5; + } + } finally { + ReferenceCountUtil.release(content); + } + } + + + public void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame headersFrame) throws Exception { + // TODO Subsequent decompression logic is possible + } + + private int safeCastToInt(long value) { + if (value >= Integer.MIN_VALUE && value <= Integer.MAX_VALUE) { + return (int)value; + } else { + throw new IllegalArgumentException("Value exceeds int range: " + value); + } + } + +} diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java new file mode 100644 index 00000000000..523f057dc67 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.grpc; + +import com.google.protobuf.Any; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http2.DefaultHttp2DataFrame; +import io.netty.handler.codec.http2.DefaultHttp2Headers; +import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame; +import io.netty.handler.codec.http2.Http2Headers; +import org.apache.seata.core.protocol.RpcMessage; +import org.apache.seata.core.protocol.generated.GrpcMessageProto; +import org.apache.seata.core.serializer.Serializer; +import org.apache.seata.core.serializer.SerializerServiceLoader; +import org.apache.seata.core.serializer.SerializerType; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class GrpcEncoder extends ChannelOutboundHandlerAdapter { + private final AtomicBoolean headerSent = new AtomicBoolean(false); + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (!(msg instanceof RpcMessage)) { + throw new UnsupportedOperationException("GrpcEncoder not support class:" + msg.getClass()); + } + + RpcMessage rpcMessage = (RpcMessage) msg; + byte messageType = rpcMessage.getMessageType(); + Map headMap = rpcMessage.getHeadMap(); + Object body = rpcMessage.getBody(); + int id = rpcMessage.getId(); + + if (headerSent.compareAndSet(false, true)) + { + Http2Headers headers = new DefaultHttp2Headers(); + headers.add(GrpcHeaderEnum.HTTP2_STATUS.header, String.valueOf(200)); + headers.add(GrpcHeaderEnum.GRPC_STATUS.header, String.valueOf(0)); + headers.add(GrpcHeaderEnum.GRPC_CONTENT_TYPE.header, "application/grpc"); + ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers)); + } + + Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(SerializerType.GRPC.getCode()), (byte) 0); + Any messageBody = Any.parseFrom(serializer.serialize(body)); + GrpcMessageProto grpcMessageProto = GrpcMessageProto.newBuilder() + .setBody(messageBody) + .putAllHeadMap(headMap) + .setMessageType(messageType) + .setId(id).build(); + byte[] bodyBytes = grpcMessageProto.toByteArray(); + if (bodyBytes != null) + { + byte[] messageWithPrefix = new byte[bodyBytes.length + 5]; + // The first byte is 0, indicating no compression + messageWithPrefix[0] = 0; + ByteBuffer buffer = ByteBuffer.allocate(4); + buffer.putInt(bodyBytes.length); + byte[] lengthBytes = buffer.array(); + // The last four bytes indicate the length + System.arraycopy(lengthBytes, 0, messageWithPrefix, 1, 4); + // The remaining bytes are body + System.arraycopy(bodyBytes, 0, messageWithPrefix, 5, bodyBytes.length); + ctx.writeAndFlush(new DefaultHttp2DataFrame(Unpooled.wrappedBuffer(messageWithPrefix))); + } + } + +} diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcHeaderEnum.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcHeaderEnum.java new file mode 100644 index 00000000000..2a803cedbec --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcHeaderEnum.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.grpc; + +public enum GrpcHeaderEnum { + + /** + * grpc status + */ + GRPC_STATUS("grpc-status"), + /** + * http2 status + */ + HTTP2_STATUS(":status"), + /** + * content-type + */ + GRPC_CONTENT_TYPE("content-type"); + + public final String header; + + GrpcHeaderEnum(String header) { + this.header = header; + } +} diff --git a/core/src/main/java/org/apache/seata/core/serializer/SerializerServiceLoader.java b/core/src/main/java/org/apache/seata/core/serializer/SerializerServiceLoader.java index 63ce440edd1..0aa9bd340e8 100644 --- a/core/src/main/java/org/apache/seata/core/serializer/SerializerServiceLoader.java +++ b/core/src/main/java/org/apache/seata/core/serializer/SerializerServiceLoader.java @@ -72,8 +72,7 @@ public static Serializer load(SerializerType type, byte version) throws Enhanced "Please manually reference 'org.apache.seata:seata-serializer-protobuf' dependency."); } - - String key = serialzerKey(type, version); + String key = serializerKey(type, version); Serializer serializer = SERIALIZER_MAP.get(key); if (serializer == null) { if (type == SerializerType.SEATA) { @@ -86,7 +85,30 @@ public static Serializer load(SerializerType type, byte version) throws Enhanced return serializer; } - private static String serialzerKey(SerializerType type, byte version) { + /** + * Load the service of {@link Serializer} + * + * @param type the serializer type + * @return the service of {@link Serializer} + * @throws EnhancedServiceNotFoundException the enhanced service not found exception + */ + public static Serializer load(SerializerType type) throws EnhancedServiceNotFoundException { + if (type == SerializerType.PROTOBUF && !CONTAINS_PROTOBUF_DEPENDENCY) { + throw new EnhancedServiceNotFoundException("The class '" + PROTOBUF_SERIALIZER_CLASS_NAME + "' not found. " + + "Please manually reference 'org.apache.seata:seata-serializer-protobuf' dependency."); + } + + String key = type.name(); + Serializer serializer = SERIALIZER_MAP.get(key); + if (serializer == null) { + serializer = EnhancedServiceLoader.load(Serializer.class, type.name()); + + SERIALIZER_MAP.put(key, serializer); + } + return serializer; + } + + private static String serializerKey(SerializerType type, byte version) { if (type == SerializerType.SEATA) { return type.name() + version; } diff --git a/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java b/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java index c60067e72bc..8c5d21f68f5 100644 --- a/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java +++ b/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java @@ -63,6 +63,13 @@ public enum SerializerType { * Math.pow(2, 5) */ JACKSON((byte)0x32), + + /** + * The grpc. + *

+ * Math.pow(2, 6) + */ + GRPC((byte)0x64) ; private final byte code; diff --git a/core/src/main/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto b/core/src/main/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto new file mode 100644 index 00000000000..cdf1b52f4a8 --- /dev/null +++ b/core/src/main/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto3"; +package org.apache.seata.protocol.protobuf; +import "google/protobuf/any.proto"; +option java_multiple_files = true; +option java_outer_classname = "GrpcMessage"; +option java_package = "org.apache.seata.core.protocol.generated"; + +message GrpcMessageProto { + int32 id = 1; + int32 messageType = 2; + map headMap = 3; + google.protobuf.Any body = 4; +} + +service SeataService { + rpc sendRequest (stream GrpcMessageProto) returns (stream GrpcMessageProto); +} \ No newline at end of file diff --git a/dependencies/pom.xml b/dependencies/pom.xml index 4bf0450d4f3..15178f14166 100644 --- a/dependencies/pom.xml +++ b/dependencies/pom.xml @@ -76,7 +76,6 @@ 4.1.101.Final 4.0.3 1.6.7 - 3.25.4 1.66.0 5.4.0 @@ -623,6 +622,11 @@ grpc-core ${grpc.version} + + io.grpc + grpc-alts + ${grpc.version} + io.grpc grpc-api diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/TransportProperties.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/TransportProperties.java index 8b4caa4e8f6..64f38a385d0 100644 --- a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/TransportProperties.java +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/TransportProperties.java @@ -23,6 +23,7 @@ import static org.apache.seata.common.DefaultValues.DEFAULT_ENABLE_RM_CLIENT_BATCH_SEND_REQUEST; import static org.apache.seata.common.DefaultValues.DEFAULT_ENABLE_TC_SERVER_BATCH_SEND_RESPONSE; import static org.apache.seata.common.DefaultValues.DEFAULT_ENABLE_TM_CLIENT_BATCH_SEND_REQUEST; +import static org.apache.seata.common.DefaultValues.DEFAULT_PROTOCOL; import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_RM_REQUEST_TIMEOUT; import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_TC_REQUEST_TIMEOUT; import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_TM_REQUEST_TIMEOUT; @@ -54,6 +55,8 @@ public class TransportProperties { */ private String compressor = "none"; + private String protocol = DEFAULT_PROTOCOL; + /** * enable client batch send request */ @@ -193,4 +196,12 @@ public long getRpcTcRequestTimeout() { public void setRpcTcRequestTimeout(long rpcTcRequestTimeout) { this.rpcTcRequestTimeout = rpcTcRequestTimeout; } + + public String getProtocol() { + return protocol; + } + + public void setProtocol(String protocol) { + this.protocol = protocol; + } } diff --git a/serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/GrpcSerializer.java b/serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/GrpcSerializer.java new file mode 100644 index 00000000000..2ef8eac784e --- /dev/null +++ b/serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/GrpcSerializer.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.serializer.protobuf; + +import com.google.protobuf.Any; +import com.google.protobuf.Message; +import org.apache.seata.common.exception.ShouldNeverHappenException; +import org.apache.seata.common.loader.LoadLevel; +import org.apache.seata.core.serializer.Serializer; +import org.apache.seata.serializer.protobuf.convertor.PbConvertor; +import org.apache.seata.serializer.protobuf.manager.ProtobufConvertManager; + +@LoadLevel(name = "GRPC") +public class GrpcSerializer implements Serializer { + @Override + public byte[] serialize(T t) { + PbConvertor pbConvertor = ProtobufConvertManager.getInstance() + .fetchConvertor(t.getClass().getName()); + Any grpcBody = Any.pack((Message) pbConvertor.convert2Proto(t)); + + return grpcBody.toByteArray(); + } + + @Override + public T deserialize(byte[] bytes) { + try { + Any body = Any.parseFrom(bytes); + final Class clazz = ProtobufConvertManager.getInstance().fetchProtoClass(getTypeNameFromTypeUrl(body.getTypeUrl())); + if (body.is(clazz)) { + Object ob = body.unpack(clazz); + PbConvertor pbConvertor = ProtobufConvertManager.getInstance().fetchReversedConvertor(clazz.getName()); + + return (T) pbConvertor.convert2Model(ob); + } + } catch (Throwable e) { + throw new ShouldNeverHappenException("GrpcSerializer deserialize error", e); + } + + return null; + } + + private String getTypeNameFromTypeUrl(String typeUri) { + int pos = typeUri.lastIndexOf('/'); + return pos == -1 ? "" : typeUri.substring(pos + 1); + } +} diff --git a/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer b/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer index 71098c53674..f6fbf709dea 100644 --- a/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer +++ b/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer @@ -14,4 +14,5 @@ # See the License for the specific language governing permissions and # limitations under the License. # -org.apache.seata.serializer.protobuf.ProtobufSerializer \ No newline at end of file +org.apache.seata.serializer.protobuf.ProtobufSerializer +org.apache.seata.serializer.protobuf.GrpcSerializer \ No newline at end of file diff --git a/test/pom.xml b/test/pom.xml index 91d4c9f60fb..d35f25bad5e 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -40,6 +40,28 @@ true + + org.xolstice.maven.plugins + protobuf-maven-plugin + + ${project.basedir}/src/test/resources/protobuf/org/apache/seata/protocol/transcation/ + + com.google.protobuf:protoc:3.25.4:exe:${os.detected.classifier} + + grpc-java + + io.grpc:protoc-gen-grpc-java:1.66.0:exe:${os.detected.classifier} + + + + + + compile + compile-custom + + + + @@ -49,6 +71,10 @@ seata-tm ${project.version} + + io.grpc + grpc-alts + diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java new file mode 100644 index 00000000000..581a0e15f2d --- /dev/null +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.mockserver; + +import com.google.protobuf.Any; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; +import org.apache.seata.common.ConfigurationKeys; +import org.apache.seata.common.ConfigurationTestHelper; +import org.apache.seata.config.ConfigurationFactory; +import org.apache.seata.core.protocol.generated.GrpcMessageProto; +import org.apache.seata.core.rpc.netty.RmNettyRemotingClient; +import org.apache.seata.core.rpc.netty.TmNettyRemotingClient; +import org.apache.seata.mockserver.MockServer; +import org.apache.seata.serializer.protobuf.generated.*; +import org.apache.seata.core.protocol.generated.SeataServiceGrpc; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class GrpcTest { + + private static ManagedChannel channel; + + private static SeataServiceGrpc.SeataServiceStub seataServiceStub; + + @BeforeAll + public static void before() { + ConfigurationFactory.reload(); + ConfigurationTestHelper.putConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL, String.valueOf(ProtocolTestConstants.MOCK_SERVER_PORT)); + MockServer.start(ProtocolTestConstants.MOCK_SERVER_PORT); + TmNettyRemotingClient.getInstance().destroy(); + RmNettyRemotingClient.getInstance().destroy(); + + channel = ManagedChannelBuilder.forAddress("127.0.0.1", ProtocolTestConstants.MOCK_SERVER_PORT).usePlaintext().build(); + seataServiceStub = SeataServiceGrpc.newStub(channel); + } + + @AfterAll + public static void after() { + //MockServer.close(); + ConfigurationTestHelper.removeConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL); + TmNettyRemotingClient.getInstance().destroy(); + RmNettyRemotingClient.getInstance().destroy(); + } + + private GrpcMessageProto getRegisterTMRequest() { + AbstractIdentifyRequestProto abstractIdentifyRequestProto = AbstractIdentifyRequestProto.newBuilder() + .setApplicationId("test-applicationId") + .build(); + RegisterTMRequestProto registerTMRequestProto = RegisterTMRequestProto.newBuilder() + .setAbstractIdentifyRequest(abstractIdentifyRequestProto) + .build(); + + return GrpcMessageProto.newBuilder().setBody(Any.pack(registerTMRequestProto)).build(); + } + + private GrpcMessageProto getGlobalBeginRequest() { + GlobalBeginRequestProto globalBeginRequestProto = GlobalBeginRequestProto.newBuilder() + .setTransactionName("test-transaction") + .setTimeout(2000) + .build(); + return GrpcMessageProto.newBuilder().setBody(Any.pack(globalBeginRequestProto)).build(); + } + + private GrpcMessageProto getBranchRegisterRequest() { + BranchRegisterRequestProto branchRegisterRequestProto = BranchRegisterRequestProto.newBuilder() + .setXid("1") + .setLockKey("1") + .setResourceId("test-resource") + .setBranchType(BranchTypeProto.TCC) + .setApplicationData("{\"mock\":\"mock\"}") + .build(); + + return GrpcMessageProto.newBuilder().setBody(Any.pack(branchRegisterRequestProto)).build(); + } + + private GrpcMessageProto getGlobalCommitRequest() { + AbstractGlobalEndRequestProto globalEndRequestProto = AbstractGlobalEndRequestProto.newBuilder() + .setXid("1") + .build(); + GlobalCommitRequestProto globalCommitRequestProto = GlobalCommitRequestProto.newBuilder() + .setAbstractGlobalEndRequest(globalEndRequestProto) + .build(); + + return GrpcMessageProto.newBuilder().setBody(Any.pack(globalCommitRequestProto)).build(); + } + + private GrpcMessageProto getGlobalRollbackRequest() { + AbstractGlobalEndRequestProto globalEndRequestProto = AbstractGlobalEndRequestProto.newBuilder() + .setXid("1") + .build(); + GlobalRollbackRequestProto globalRollbackRequestProto = GlobalRollbackRequestProto.newBuilder() + .setAbstractGlobalEndRequest(globalEndRequestProto) + .build(); + + return GrpcMessageProto.newBuilder().setBody(Any.pack(globalRollbackRequestProto)).build(); + } + + @Test + public void testCommit() throws InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(4); + StreamObserver streamObserver = new StreamObserver() { + @Override + public void onNext(GrpcMessageProto grpcMessageProto) { + System.out.println("receive : " + grpcMessageProto.toString()); + countDownLatch.countDown(); + } + + @Override + public void onError(Throwable throwable) { + throwable.printStackTrace(); + } + + @Override + public void onCompleted() { + + } + }; + + StreamObserver response = seataServiceStub.sendRequest(streamObserver); + response.onNext(getRegisterTMRequest()); + response.onNext(getGlobalBeginRequest()); + response.onNext(getBranchRegisterRequest()); + response.onNext(getGlobalCommitRequest()); + + response.onCompleted(); + + countDownLatch.await(10, TimeUnit.SECONDS); + } + + @Test + public void testRollback() throws InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(4); + StreamObserver streamObserver = new StreamObserver() { + @Override + public void onNext(GrpcMessageProto grpcMessageProto) { + System.out.println("receive : " + grpcMessageProto.toString()); + countDownLatch.countDown(); + } + + @Override + public void onError(Throwable throwable) { + throwable.printStackTrace(); + } + + @Override + public void onCompleted() { + + } + }; + + StreamObserver response = seataServiceStub.sendRequest(streamObserver); + response.onNext(getRegisterTMRequest()); + response.onNext(getGlobalBeginRequest()); + response.onNext(getBranchRegisterRequest()); + response.onNext(getGlobalRollbackRequest()); + + response.onCompleted(); + + countDownLatch.await(10, TimeUnit.SECONDS); + } +} diff --git a/test/src/test/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto b/test/src/test/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto new file mode 100644 index 00000000000..cdf1b52f4a8 --- /dev/null +++ b/test/src/test/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto3"; +package org.apache.seata.protocol.protobuf; +import "google/protobuf/any.proto"; +option java_multiple_files = true; +option java_outer_classname = "GrpcMessage"; +option java_package = "org.apache.seata.core.protocol.generated"; + +message GrpcMessageProto { + int32 id = 1; + int32 messageType = 2; + map headMap = 3; + google.protobuf.Any body = 4; +} + +service SeataService { + rpc sendRequest (stream GrpcMessageProto) returns (stream GrpcMessageProto); +} \ No newline at end of file From 7020440bca4e0d3f79c324f479efa0364b126f4a Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Thu, 26 Sep 2024 22:24:57 +0800 Subject: [PATCH 02/21] idle --- .../core/rpc/netty/NettyClientBootstrap.java | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java index 68ee02d71c0..0fbd9ff0795 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java @@ -139,21 +139,18 @@ public void start() { @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(), - nettyClientConfig.getChannelMaxWriteIdleSeconds(), - nettyClientConfig.getChannelMaxAllIdleSeconds())); if (nettyClientConfig.getProtocol().equals(Protocol.GPRC.value)) { pipeline.addLast(Http2FrameCodecBuilder.forClient().build()) - .addLast(new Http2MultiplexHandler(new ChannelDuplexHandler())) - .addLast(new GrpcDecoder()) - .addLast(new GrpcEncoder()); + .addLast(new Http2MultiplexHandler(new ChannelDuplexHandler())); } else { + pipeline.addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(), + nettyClientConfig.getChannelMaxWriteIdleSeconds(), + nettyClientConfig.getChannelMaxAllIdleSeconds())); pipeline.addLast(new ProtocolDecoderV1()) .addLast(new ProtocolEncoderV1()); - } - - if (channelHandlers != null) { - addChannelPipelineLast(ch, channelHandlers); + if (channelHandlers != null) { + addChannelPipelineLast(ch, channelHandlers); + } } } }); @@ -194,13 +191,15 @@ public Channel getNewChannel(InetSocketAddress address) { channel = f.channel(); } - // TODO tmp only for grpc if (nettyClientConfig.getProtocol().equals(Protocol.GPRC.value)) { Http2StreamChannelBootstrap bootstrap = new Http2StreamChannelBootstrap(channel); bootstrap.handler(new ChannelInboundHandlerAdapter() { @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + public void handlerAdded(ChannelHandlerContext ctx) { Channel channel = ctx.channel(); + channel.pipeline().addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(), + nettyClientConfig.getChannelMaxWriteIdleSeconds(), + nettyClientConfig.getChannelMaxAllIdleSeconds())); channel.pipeline().addLast(new GrpcDecoder()); channel.pipeline().addLast(new GrpcEncoder()); if (channelHandlers != null) { From d4ed24cc6b22af67ca4b0836b4b747782ae27d40 Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Tue, 1 Oct 2024 10:11:32 +0800 Subject: [PATCH 03/21] codec --- .../core/rpc/netty/grpc/GrpcDecoder.java | 34 +++++++++++++------ .../core/rpc/netty/grpc/GrpcEncoder.java | 28 +++++++++------ .../core/rpc/netty/grpc/GrpcHeaderEnum.java | 7 +++- .../protocol/transcation/grpcMessage.proto | 3 +- 4 files changed, 48 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java index bcde5702eb3..1842ebac8ac 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java @@ -16,7 +16,6 @@ */ package org.apache.seata.core.rpc.netty.grpc; -import com.google.protobuf.Any; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; @@ -24,12 +23,16 @@ import io.netty.handler.codec.http2.Http2HeadersFrame; import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; +import org.apache.seata.core.protocol.HeartbeatMessage; +import org.apache.seata.core.protocol.ProtocolConstants; import org.apache.seata.core.protocol.RpcMessage; import org.apache.seata.core.protocol.generated.GrpcMessageProto; import org.apache.seata.core.serializer.Serializer; import org.apache.seata.core.serializer.SerializerServiceLoader; import org.apache.seata.core.serializer.SerializerType; +import java.util.Map; + public class GrpcDecoder extends ChannelDuplexHandler { @Override @@ -62,23 +65,32 @@ public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg) throws Exc byte[] data = new byte[length]; System.arraycopy(bytes, srcPos + 5, data, 0, length); - GrpcMessageProto grpcMessageProto = GrpcMessageProto.parseFrom(data); - Any body = grpcMessageProto.getBody(); - int messageType = safeCastToInt(grpcMessageProto.getMessageType()); - int messageId = safeCastToInt(grpcMessageProto.getId()); - byte[] byteArray = body.toByteArray(); - - Serializer serializer = - SerializerServiceLoader.load(SerializerType.getByCode(SerializerType.GRPC.getCode()), (byte)0); - Object messageBody = serializer.deserialize(byteArray); + byte[] dataBytes = grpcMessageProto.getData().toByteArray(); + int messageType = grpcMessageProto.getMessageType(); + int messageId = grpcMessageProto.getId(); + Map headMap = grpcMessageProto.getHeadMapMap(); RpcMessage rpcMsg = new RpcMessage(); rpcMsg.setMessageType((byte)messageType); - rpcMsg.setBody(messageBody); rpcMsg.setId(messageId); rpcMsg.setHeadMap(grpcMessageProto.getHeadMapMap()); + if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST) { + rpcMsg.setBody(HeartbeatMessage.PING); + } else if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) { + rpcMsg.setBody(HeartbeatMessage.PONG); + }else { + SerializerType serializerType = SerializerType.PROTOBUF; + if (headMap.containsKey(GrpcHeaderEnum.CODEC_TYPE.header)) { + String codecValue = headMap.get(GrpcHeaderEnum.CODEC_TYPE.header); + serializerType = SerializerType.getByCode(Integer.parseInt(codecValue)); + } + Serializer serializer = SerializerServiceLoader.load(serializerType); + Object messageBody = serializer.deserialize(dataBytes); + rpcMsg.setBody(messageBody); + } + ctx.fireChannelRead(rpcMsg); srcPos += length + 5; diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java index 523f057dc67..784046d8f31 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java @@ -16,7 +16,7 @@ */ package org.apache.seata.core.rpc.netty.grpc; -import com.google.protobuf.Any; +import com.google.protobuf.ByteString; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; @@ -25,6 +25,7 @@ import io.netty.handler.codec.http2.DefaultHttp2Headers; import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame; import io.netty.handler.codec.http2.Http2Headers; +import org.apache.seata.core.protocol.ProtocolConstants; import org.apache.seata.core.protocol.RpcMessage; import org.apache.seata.core.protocol.generated.GrpcMessageProto; import org.apache.seata.core.serializer.Serializer; @@ -50,8 +51,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) Object body = rpcMessage.getBody(); int id = rpcMessage.getId(); - if (headerSent.compareAndSet(false, true)) - { + if (headerSent.compareAndSet(false, true)) { Http2Headers headers = new DefaultHttp2Headers(); headers.add(GrpcHeaderEnum.HTTP2_STATUS.header, String.valueOf(200)); headers.add(GrpcHeaderEnum.GRPC_STATUS.header, String.valueOf(0)); @@ -59,16 +59,24 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers)); } - Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(SerializerType.GRPC.getCode()), (byte) 0); - Any messageBody = Any.parseFrom(serializer.serialize(body)); - GrpcMessageProto grpcMessageProto = GrpcMessageProto.newBuilder() - .setBody(messageBody) + byte[] dataBytes = null; + if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST + && messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) { + Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec())); + dataBytes = serializer.serialize(body); + } + headMap.put(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(rpcMessage.getCodec())); + GrpcMessageProto.Builder builder = GrpcMessageProto.newBuilder() .putAllHeadMap(headMap) .setMessageType(messageType) - .setId(id).build(); + .setId(id); + if (dataBytes != null) { + builder.setData(ByteString.copyFrom(dataBytes)); + } + GrpcMessageProto grpcMessageProto = builder.build(); + byte[] bodyBytes = grpcMessageProto.toByteArray(); - if (bodyBytes != null) - { + if (bodyBytes != null) { byte[] messageWithPrefix = new byte[bodyBytes.length + 5]; // The first byte is 0, indicating no compression messageWithPrefix[0] = 0; diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcHeaderEnum.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcHeaderEnum.java index 2a803cedbec..d10eb7e1c70 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcHeaderEnum.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcHeaderEnum.java @@ -29,7 +29,12 @@ public enum GrpcHeaderEnum { /** * content-type */ - GRPC_CONTENT_TYPE("content-type"); + GRPC_CONTENT_TYPE("content-type"), + + /** + * codec-type + */ + CODEC_TYPE("codec-type"); public final String header; diff --git a/core/src/main/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto b/core/src/main/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto index cdf1b52f4a8..6317a8518ca 100644 --- a/core/src/main/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto +++ b/core/src/main/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto @@ -16,7 +16,6 @@ */ syntax = "proto3"; package org.apache.seata.protocol.protobuf; -import "google/protobuf/any.proto"; option java_multiple_files = true; option java_outer_classname = "GrpcMessage"; option java_package = "org.apache.seata.core.protocol.generated"; @@ -25,7 +24,7 @@ message GrpcMessageProto { int32 id = 1; int32 messageType = 2; map headMap = 3; - google.protobuf.Any body = 4; + bytes data = 4; } service SeataService { From 82e104d55204715b101afa4f7461797bb8e3e315 Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Tue, 1 Oct 2024 19:52:28 +0800 Subject: [PATCH 04/21] codec --- .../core/rpc/netty/grpc/GrpcDecoder.java | 23 ++++--- .../core/rpc/netty/grpc/GrpcEncoder.java | 6 +- .../core/rpc/netty/grpc/GrpcHeaderEnum.java | 7 ++- .../core/rpc/netty/v1/ProtocolEncoderV1.java | 1 - .../protocol/transcation/grpcMessage.proto | 2 +- .../serializer/protobuf/GrpcSerializer.java | 60 ------------------- ...rg.apache.seata.core.serializer.Serializer | 3 +- .../core/rpc/netty/mockserver/GrpcTest.java | 1 - .../protocol/transcation/grpcMessage.proto | 2 +- 9 files changed, 29 insertions(+), 76 deletions(-) delete mode 100644 serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/GrpcSerializer.java diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java index 1842ebac8ac..9f95c72cc42 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java @@ -23,6 +23,8 @@ import io.netty.handler.codec.http2.Http2HeadersFrame; import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; +import org.apache.seata.core.compressor.Compressor; +import org.apache.seata.core.compressor.CompressorFactory; import org.apache.seata.core.protocol.HeartbeatMessage; import org.apache.seata.core.protocol.ProtocolConstants; import org.apache.seata.core.protocol.RpcMessage; @@ -61,18 +63,18 @@ public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg) throws Exc // The first byte defaults to 0, indicating that no decompression is required // Read the value of the next four bytes as the length of the body int length = ((bytes[srcPos + 1] & 0xFF) << 24) | ((bytes[srcPos + 2] & 0xFF) << 16) - | ((bytes[srcPos + 3] & 0xFF) << 8) | (bytes[srcPos + 4] & 0xFF); + | ((bytes[srcPos + 3] & 0xFF) << 8) | (bytes[srcPos + 4] & 0xFF); byte[] data = new byte[length]; System.arraycopy(bytes, srcPos + 5, data, 0, length); GrpcMessageProto grpcMessageProto = GrpcMessageProto.parseFrom(data); - byte[] dataBytes = grpcMessageProto.getData().toByteArray(); + byte[] bodyBytes = grpcMessageProto.getBody().toByteArray(); int messageType = grpcMessageProto.getMessageType(); int messageId = grpcMessageProto.getId(); Map headMap = grpcMessageProto.getHeadMapMap(); RpcMessage rpcMsg = new RpcMessage(); - rpcMsg.setMessageType((byte)messageType); + rpcMsg.setMessageType((byte) messageType); rpcMsg.setId(messageId); rpcMsg.setHeadMap(grpcMessageProto.getHeadMapMap()); @@ -80,14 +82,19 @@ public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg) throws Exc rpcMsg.setBody(HeartbeatMessage.PING); } else if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) { rpcMsg.setBody(HeartbeatMessage.PONG); - }else { - SerializerType serializerType = SerializerType.PROTOBUF; + } else { + SerializerType serializerType = SerializerType.SEATA; + if (headMap.containsKey(GrpcHeaderEnum.COMPRESS_TYPE.header)) { + Compressor compressor = CompressorFactory.getCompressor(rpcMsg.getCompressor()); + bodyBytes = compressor.decompress(bodyBytes); + } + if (headMap.containsKey(GrpcHeaderEnum.CODEC_TYPE.header)) { String codecValue = headMap.get(GrpcHeaderEnum.CODEC_TYPE.header); serializerType = SerializerType.getByCode(Integer.parseInt(codecValue)); } Serializer serializer = SerializerServiceLoader.load(serializerType); - Object messageBody = serializer.deserialize(dataBytes); + Object messageBody = serializer.deserialize(bodyBytes); rpcMsg.setBody(messageBody); } @@ -99,7 +106,7 @@ public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg) throws Exc ReferenceCountUtil.release(content); } } - + public void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame headersFrame) throws Exception { // TODO Subsequent decompression logic is possible @@ -107,7 +114,7 @@ public void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame headersFr private int safeCastToInt(long value) { if (value >= Integer.MIN_VALUE && value <= Integer.MAX_VALUE) { - return (int)value; + return (int) value; } else { throw new IllegalArgumentException("Value exceeds int range: " + value); } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java index 784046d8f31..c0b66b60d9f 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java @@ -25,6 +25,8 @@ import io.netty.handler.codec.http2.DefaultHttp2Headers; import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame; import io.netty.handler.codec.http2.Http2Headers; +import org.apache.seata.core.compressor.Compressor; +import org.apache.seata.core.compressor.CompressorFactory; import org.apache.seata.core.protocol.ProtocolConstants; import org.apache.seata.core.protocol.RpcMessage; import org.apache.seata.core.protocol.generated.GrpcMessageProto; @@ -66,12 +68,14 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) dataBytes = serializer.serialize(body); } headMap.put(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(rpcMessage.getCodec())); + headMap.put(GrpcHeaderEnum.COMPRESS_TYPE.header, String.valueOf(rpcMessage.getCompressor())); GrpcMessageProto.Builder builder = GrpcMessageProto.newBuilder() .putAllHeadMap(headMap) .setMessageType(messageType) .setId(id); if (dataBytes != null) { - builder.setData(ByteString.copyFrom(dataBytes)); + Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor()); + builder.setBody(ByteString.copyFrom(compressor.compress(dataBytes))); } GrpcMessageProto grpcMessageProto = builder.build(); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcHeaderEnum.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcHeaderEnum.java index d10eb7e1c70..ed8e729b47d 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcHeaderEnum.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcHeaderEnum.java @@ -34,7 +34,12 @@ public enum GrpcHeaderEnum { /** * codec-type */ - CODEC_TYPE("codec-type"); + CODEC_TYPE("codec-type"), + + /** + * compress-type + */ + COMPRESS_TYPE("compress-type"); public final String header; diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java index dd01b948dba..39180f3bdc0 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java @@ -66,7 +66,6 @@ public class ProtocolEncoderV1 extends MessageToByteEncoder implements ProtocolE public void encode(RpcMessage message, ByteBuf out) { try { - ProtocolRpcMessageV1 rpcMessage = new ProtocolRpcMessageV1(); rpcMessage.rpcMsg2ProtocolMsg(message); diff --git a/core/src/main/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto b/core/src/main/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto index 6317a8518ca..dd61bd95f48 100644 --- a/core/src/main/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto +++ b/core/src/main/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto @@ -24,7 +24,7 @@ message GrpcMessageProto { int32 id = 1; int32 messageType = 2; map headMap = 3; - bytes data = 4; + bytes body = 4; } service SeataService { diff --git a/serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/GrpcSerializer.java b/serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/GrpcSerializer.java deleted file mode 100644 index 2ef8eac784e..00000000000 --- a/serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/GrpcSerializer.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.seata.serializer.protobuf; - -import com.google.protobuf.Any; -import com.google.protobuf.Message; -import org.apache.seata.common.exception.ShouldNeverHappenException; -import org.apache.seata.common.loader.LoadLevel; -import org.apache.seata.core.serializer.Serializer; -import org.apache.seata.serializer.protobuf.convertor.PbConvertor; -import org.apache.seata.serializer.protobuf.manager.ProtobufConvertManager; - -@LoadLevel(name = "GRPC") -public class GrpcSerializer implements Serializer { - @Override - public byte[] serialize(T t) { - PbConvertor pbConvertor = ProtobufConvertManager.getInstance() - .fetchConvertor(t.getClass().getName()); - Any grpcBody = Any.pack((Message) pbConvertor.convert2Proto(t)); - - return grpcBody.toByteArray(); - } - - @Override - public T deserialize(byte[] bytes) { - try { - Any body = Any.parseFrom(bytes); - final Class clazz = ProtobufConvertManager.getInstance().fetchProtoClass(getTypeNameFromTypeUrl(body.getTypeUrl())); - if (body.is(clazz)) { - Object ob = body.unpack(clazz); - PbConvertor pbConvertor = ProtobufConvertManager.getInstance().fetchReversedConvertor(clazz.getName()); - - return (T) pbConvertor.convert2Model(ob); - } - } catch (Throwable e) { - throw new ShouldNeverHappenException("GrpcSerializer deserialize error", e); - } - - return null; - } - - private String getTypeNameFromTypeUrl(String typeUri) { - int pos = typeUri.lastIndexOf('/'); - return pos == -1 ? "" : typeUri.substring(pos + 1); - } -} diff --git a/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer b/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer index f6fbf709dea..71098c53674 100644 --- a/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer +++ b/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer @@ -14,5 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -org.apache.seata.serializer.protobuf.ProtobufSerializer -org.apache.seata.serializer.protobuf.GrpcSerializer \ No newline at end of file +org.apache.seata.serializer.protobuf.ProtobufSerializer \ No newline at end of file diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java index 581a0e15f2d..e8d0a25744e 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java @@ -23,7 +23,6 @@ import org.apache.seata.common.ConfigurationKeys; import org.apache.seata.common.ConfigurationTestHelper; import org.apache.seata.config.ConfigurationFactory; -import org.apache.seata.core.protocol.generated.GrpcMessageProto; import org.apache.seata.core.rpc.netty.RmNettyRemotingClient; import org.apache.seata.core.rpc.netty.TmNettyRemotingClient; import org.apache.seata.mockserver.MockServer; diff --git a/test/src/test/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto b/test/src/test/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto index cdf1b52f4a8..6a69276654a 100644 --- a/test/src/test/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto +++ b/test/src/test/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto @@ -25,7 +25,7 @@ message GrpcMessageProto { int32 id = 1; int32 messageType = 2; map headMap = 3; - google.protobuf.Any body = 4; + bytes body = 4; } service SeataService { From cb385513a78e001e1f1193c18cdd6c081e8b62a4 Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Tue, 1 Oct 2024 20:15:05 +0800 Subject: [PATCH 05/21] codec --- .../org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java | 4 +++- .../org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java | 1 - 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java index 9f95c72cc42..92e659c7609 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java @@ -74,7 +74,9 @@ public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg) throws Exc Map headMap = grpcMessageProto.getHeadMapMap(); RpcMessage rpcMsg = new RpcMessage(); - rpcMsg.setMessageType((byte) messageType); + if (messageType <= Byte.MAX_VALUE && messageType >= Byte.MIN_VALUE) { + rpcMsg.setMessageType((byte) messageType); + } rpcMsg.setId(messageId); rpcMsg.setHeadMap(grpcMessageProto.getHeadMapMap()); diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java index e8d0a25744e..e9c87592f90 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java @@ -27,7 +27,6 @@ import org.apache.seata.core.rpc.netty.TmNettyRemotingClient; import org.apache.seata.mockserver.MockServer; import org.apache.seata.serializer.protobuf.generated.*; -import org.apache.seata.core.protocol.generated.SeataServiceGrpc; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; From 53b9b68285ff47bf2f21fbb5c1f321726d7fb1d8 Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Tue, 1 Oct 2024 20:39:44 +0800 Subject: [PATCH 06/21] codec --- .../org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java | 4 +++- .../org/apache/seata/protocol/transcation/grpcMessage.proto | 1 - 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java index e9c87592f90..4e3215fffd4 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java @@ -23,10 +23,12 @@ import org.apache.seata.common.ConfigurationKeys; import org.apache.seata.common.ConfigurationTestHelper; import org.apache.seata.config.ConfigurationFactory; +import org.apache.seata.core.protocol.generated.GrpcMessageProto; import org.apache.seata.core.rpc.netty.RmNettyRemotingClient; import org.apache.seata.core.rpc.netty.TmNettyRemotingClient; import org.apache.seata.mockserver.MockServer; import org.apache.seata.serializer.protobuf.generated.*; +import org.apache.seata.core.protocol.generated.SeataServiceGrpc; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -68,7 +70,7 @@ private GrpcMessageProto getRegisterTMRequest() { .setAbstractIdentifyRequest(abstractIdentifyRequestProto) .build(); - return GrpcMessageProto.newBuilder().setBody(Any.pack(registerTMRequestProto)).build(); + return GrpcMessageProto.newBuilder().setBody(registerTMRequestProto.toByteString()).build(); } private GrpcMessageProto getGlobalBeginRequest() { diff --git a/test/src/test/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto b/test/src/test/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto index 6a69276654a..dd61bd95f48 100644 --- a/test/src/test/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto +++ b/test/src/test/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto @@ -16,7 +16,6 @@ */ syntax = "proto3"; package org.apache.seata.protocol.protobuf; -import "google/protobuf/any.proto"; option java_multiple_files = true; option java_outer_classname = "GrpcMessage"; option java_package = "org.apache.seata.core.protocol.generated"; From 043c59c78e38edc899324b778f6a45d744b1ef6c Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Tue, 1 Oct 2024 21:19:17 +0800 Subject: [PATCH 07/21] codec --- .../apache/seata/core/rpc/netty/mockserver/GrpcTest.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java index 4e3215fffd4..0d63d2eb70f 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java @@ -16,7 +16,6 @@ */ package org.apache.seata.core.rpc.netty.mockserver; -import com.google.protobuf.Any; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; @@ -78,7 +77,7 @@ private GrpcMessageProto getGlobalBeginRequest() { .setTransactionName("test-transaction") .setTimeout(2000) .build(); - return GrpcMessageProto.newBuilder().setBody(Any.pack(globalBeginRequestProto)).build(); + return GrpcMessageProto.newBuilder().setBody(globalBeginRequestProto.toByteString()).build(); } private GrpcMessageProto getBranchRegisterRequest() { @@ -90,7 +89,7 @@ private GrpcMessageProto getBranchRegisterRequest() { .setApplicationData("{\"mock\":\"mock\"}") .build(); - return GrpcMessageProto.newBuilder().setBody(Any.pack(branchRegisterRequestProto)).build(); + return GrpcMessageProto.newBuilder().setBody(branchRegisterRequestProto.toByteString()).build(); } private GrpcMessageProto getGlobalCommitRequest() { @@ -101,7 +100,7 @@ private GrpcMessageProto getGlobalCommitRequest() { .setAbstractGlobalEndRequest(globalEndRequestProto) .build(); - return GrpcMessageProto.newBuilder().setBody(Any.pack(globalCommitRequestProto)).build(); + return GrpcMessageProto.newBuilder().setBody(globalCommitRequestProto.toByteString()).build(); } private GrpcMessageProto getGlobalRollbackRequest() { @@ -112,7 +111,7 @@ private GrpcMessageProto getGlobalRollbackRequest() { .setAbstractGlobalEndRequest(globalEndRequestProto) .build(); - return GrpcMessageProto.newBuilder().setBody(Any.pack(globalRollbackRequestProto)).build(); + return GrpcMessageProto.newBuilder().setBody(globalRollbackRequestProto.toByteString()).build(); } @Test From 2cb1b6523f42a92b2e63c23bea99109fdadaf54b Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Tue, 1 Oct 2024 22:15:07 +0800 Subject: [PATCH 08/21] codec --- .../apache/seata/core/rpc/netty/grpc/GrpcDecoder.java | 2 +- .../apache/seata/core/rpc/netty/grpc/GrpcEncoder.java | 10 ++++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java index 92e659c7609..975298a1fe0 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java @@ -85,12 +85,12 @@ public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg) throws Exc } else if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) { rpcMsg.setBody(HeartbeatMessage.PONG); } else { - SerializerType serializerType = SerializerType.SEATA; if (headMap.containsKey(GrpcHeaderEnum.COMPRESS_TYPE.header)) { Compressor compressor = CompressorFactory.getCompressor(rpcMsg.getCompressor()); bodyBytes = compressor.decompress(bodyBytes); } + SerializerType serializerType = SerializerType.PROTOBUF; if (headMap.containsKey(GrpcHeaderEnum.CODEC_TYPE.header)) { String codecValue = headMap.get(GrpcHeaderEnum.CODEC_TYPE.header); serializerType = SerializerType.getByCode(Integer.parseInt(codecValue)); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java index c0b66b60d9f..b2254895f82 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java @@ -61,11 +61,11 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers)); } - byte[] dataBytes = null; + ByteString dataBytes = ByteString.EMPTY; if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST && messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) { Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec())); - dataBytes = serializer.serialize(body); + dataBytes = ByteString.copyFrom(serializer.serialize(body)); } headMap.put(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(rpcMessage.getCodec())); headMap.put(GrpcHeaderEnum.COMPRESS_TYPE.header, String.valueOf(rpcMessage.getCompressor())); @@ -73,10 +73,8 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) .putAllHeadMap(headMap) .setMessageType(messageType) .setId(id); - if (dataBytes != null) { - Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor()); - builder.setBody(ByteString.copyFrom(compressor.compress(dataBytes))); - } + Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor()); + builder.setBody(ByteString.copyFrom(compressor.compress(dataBytes.toByteArray()))); GrpcMessageProto grpcMessageProto = builder.build(); byte[] bodyBytes = grpcMessageProto.toByteArray(); From e123370c5890aad6c9d9eb20b215453a2c8e6067 Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Tue, 1 Oct 2024 22:46:05 +0800 Subject: [PATCH 09/21] codec --- .../org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java | 7 ++----- .../org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java | 2 +- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java index 975298a1fe0..8ed1828ee25 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java @@ -90,11 +90,8 @@ public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg) throws Exc bodyBytes = compressor.decompress(bodyBytes); } - SerializerType serializerType = SerializerType.PROTOBUF; - if (headMap.containsKey(GrpcHeaderEnum.CODEC_TYPE.header)) { - String codecValue = headMap.get(GrpcHeaderEnum.CODEC_TYPE.header); - serializerType = SerializerType.getByCode(Integer.parseInt(codecValue)); - } + String codecValue = headMap.get(GrpcHeaderEnum.CODEC_TYPE.header); + SerializerType serializerType = SerializerType.getByCode(Integer.parseInt(codecValue)); Serializer serializer = SerializerServiceLoader.load(serializerType); Object messageBody = serializer.deserialize(bodyBytes); rpcMsg.setBody(messageBody); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java index b2254895f82..50b0f22f024 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java @@ -67,7 +67,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec())); dataBytes = ByteString.copyFrom(serializer.serialize(body)); } - headMap.put(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(rpcMessage.getCodec())); + headMap.put(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.PROTOBUF.getCode())); headMap.put(GrpcHeaderEnum.COMPRESS_TYPE.header, String.valueOf(rpcMessage.getCompressor())); GrpcMessageProto.Builder builder = GrpcMessageProto.newBuilder() .putAllHeadMap(headMap) From ac222283c4acabe75f7792c3db25782f9b23bd18 Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Tue, 1 Oct 2024 23:24:07 +0800 Subject: [PATCH 10/21] codec --- .../java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java index 50b0f22f024..d5eea5caff1 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java @@ -64,7 +64,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) ByteString dataBytes = ByteString.EMPTY; if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST && messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) { - Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec())); + Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(SerializerType.PROTOBUF.getCode())); dataBytes = ByteString.copyFrom(serializer.serialize(body)); } headMap.put(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.PROTOBUF.getCode())); From 039ed6e463cba25c09f377bac357c5997fcd8348 Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Wed, 2 Oct 2024 00:00:16 +0800 Subject: [PATCH 11/21] codec --- .../core/rpc/netty/grpc/GrpcDecoder.java | 3 +- .../netty/mockserver/MockGrpcServerTest.java | 121 ++++++++++++++++++ 2 files changed, 123 insertions(+), 1 deletion(-) create mode 100644 test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/MockGrpcServerTest.java diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java index 8ed1828ee25..c26825b9fc2 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java @@ -86,7 +86,8 @@ public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg) throws Exc rpcMsg.setBody(HeartbeatMessage.PONG); } else { if (headMap.containsKey(GrpcHeaderEnum.COMPRESS_TYPE.header)) { - Compressor compressor = CompressorFactory.getCompressor(rpcMsg.getCompressor()); + String compressType = headMap.get(GrpcHeaderEnum.COMPRESS_TYPE.header); + Compressor compressor = CompressorFactory.getCompressor(Byte.parseByte(compressType)); bodyBytes = compressor.decompress(bodyBytes); } diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/MockGrpcServerTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/MockGrpcServerTest.java new file mode 100644 index 00000000000..162d065367e --- /dev/null +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/MockGrpcServerTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.mockserver; + +import org.apache.seata.common.ConfigurationKeys; +import org.apache.seata.common.ConfigurationTestHelper; +import org.apache.seata.config.ConfigurationFactory; +import org.apache.seata.core.exception.TransactionException; +import org.apache.seata.core.model.BranchType; +import org.apache.seata.core.model.GlobalStatus; +import org.apache.seata.core.model.TransactionManager; +import org.apache.seata.core.protocol.Protocol; +import org.apache.seata.core.rpc.netty.RmNettyRemotingClient; +import org.apache.seata.core.rpc.netty.TmNettyRemotingClient; +import org.apache.seata.mockserver.MockCoordinator; +import org.apache.seata.mockserver.MockServer; +import org.apache.seata.rm.DefaultResourceManager; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * the type MockServerTest + */ +public class MockGrpcServerTest { + + static String RESOURCE_ID = "mock-action"; + + Logger logger = LoggerFactory.getLogger(MockGrpcServerTest.class); + + @BeforeAll + public static void before() { + ConfigurationFactory.reload(); + ConfigurationTestHelper.putConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL, String.valueOf(ProtocolTestConstants.MOCK_SERVER_PORT)); + ConfigurationTestHelper.putConfig(ConfigurationKeys.TRANSPORT_PROTOCOL, Protocol.GPRC.value); + MockServer.start(ProtocolTestConstants.MOCK_SERVER_PORT); + TmNettyRemotingClient.getInstance().destroy(); + RmNettyRemotingClient.getInstance().destroy(); + } + + @Test + public void testCommit() throws TransactionException { + String xid = doTestCommit(0); + Assertions.assertEquals(1, Action1Impl.getCommitTimes(xid)); + Assertions.assertEquals(0, Action1Impl.getRollbackTimes(xid)); + } + + @Test + public void testCommitRetry() throws TransactionException { + String xid = doTestCommit(2); + Assertions.assertEquals(3, Action1Impl.getCommitTimes(xid)); + Assertions.assertEquals(0, Action1Impl.getRollbackTimes(xid)); + } + + @Test + public void testRollback() throws TransactionException { + String xid = doTestRollback(0); + Assertions.assertEquals(0, Action1Impl.getCommitTimes(xid)); + Assertions.assertEquals(1, Action1Impl.getRollbackTimes(xid)); + } + + @Test + public void testRollbackRetry() throws TransactionException { + String xid = doTestRollback(2); + Assertions.assertEquals(0, Action1Impl.getCommitTimes(xid)); + Assertions.assertEquals(3, Action1Impl.getRollbackTimes(xid)); + } + + @Test + public void testTm() throws Exception { + TmClientTest.testTm(); + } + + @Test + public void testRm() throws Exception { + RmClientTest.testRm(); + } + + private String doTestCommit(int times) throws TransactionException { + TransactionManager tm = TmClientTest.getTm(); + DefaultResourceManager rm = RmClientTest.getRm(RESOURCE_ID); + + String xid = tm.begin(ProtocolTestConstants.APPLICATION_ID, ProtocolTestConstants.SERVICE_GROUP, "test-commit", 60000); + MockCoordinator.getInstance().setExpectedRetry(xid, times); + Long branchId = rm.branchRegister(BranchType.TCC, RESOURCE_ID, "1", xid, "{\"mock\":\"mock\"}", "1"); + GlobalStatus commit = tm.commit(xid); + Assertions.assertEquals(GlobalStatus.Committed, commit); + return xid; + + } + + private String doTestRollback(int times) throws TransactionException { + TransactionManager tm = TmClientTest.getTm(); + DefaultResourceManager rm = RmClientTest.getRm(RESOURCE_ID); + + String xid = tm.begin(ProtocolTestConstants.APPLICATION_ID, ProtocolTestConstants.SERVICE_GROUP, "test-rollback", 60000); + logger.info("doTestRollback xid:{}", xid); + MockCoordinator.getInstance().setExpectedRetry(xid, times); + Long branchId = rm.branchRegister(BranchType.TCC, RESOURCE_ID, "1", xid, "{\"mock\":\"mock\"}", "1"); + GlobalStatus rollback = tm.rollback(xid); + Assertions.assertEquals(GlobalStatus.Rollbacked, rollback); + return xid; + + } +} From 05e377ff4b7bc8d0ae78e957585e67fbcffadccd Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Wed, 2 Oct 2024 12:23:09 +0800 Subject: [PATCH 12/21] codec --- .../core/rpc/netty/mockserver/MockGrpcServerTest.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/MockGrpcServerTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/MockGrpcServerTest.java index 162d065367e..ce9c52dbd85 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/MockGrpcServerTest.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/MockGrpcServerTest.java @@ -29,6 +29,7 @@ import org.apache.seata.mockserver.MockCoordinator; import org.apache.seata.mockserver.MockServer; import org.apache.seata.rm.DefaultResourceManager; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -54,6 +55,15 @@ public static void before() { RmNettyRemotingClient.getInstance().destroy(); } + @AfterAll + public static void after() { + //MockServer.close(); + ConfigurationTestHelper.removeConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL); + ConfigurationTestHelper.removeConfig(ConfigurationKeys.TRANSPORT_PROTOCOL); + TmNettyRemotingClient.getInstance().destroy(); + RmNettyRemotingClient.getInstance().destroy(); + } + @Test public void testCommit() throws TransactionException { String xid = doTestCommit(0); From adde917de87fffadb790467d6d5f8f802f6ee122 Mon Sep 17 00:00:00 2001 From: yiqi <77573225+PleaseGiveMeTheCoke@users.noreply.github.com> Date: Wed, 2 Oct 2024 12:25:58 +0800 Subject: [PATCH 13/21] Update core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java Co-authored-by: funkye <364176773@qq.com> --- .../seata/core/rpc/netty/grpc/GrpcDecoder.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java index c26825b9fc2..24b2c6bd7c5 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java @@ -85,14 +85,17 @@ public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg) throws Exc } else if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) { rpcMsg.setBody(HeartbeatMessage.PONG); } else { - if (headMap.containsKey(GrpcHeaderEnum.COMPRESS_TYPE.header)) { - String compressType = headMap.get(GrpcHeaderEnum.COMPRESS_TYPE.header); - Compressor compressor = CompressorFactory.getCompressor(Byte.parseByte(compressType)); + String compressType = headMap.get(GrpcHeaderEnum.COMPRESS_TYPE.header);; + if (StringUtils.isNotBlank(compressType)) { + byte compress = Byte.parseByte(compressType); + rpcMsg.setCompressor(compress); + Compressor compressor = CompressorFactory.getCompressor(compress); bodyBytes = compressor.decompress(bodyBytes); } - String codecValue = headMap.get(GrpcHeaderEnum.CODEC_TYPE.header); - SerializerType serializerType = SerializerType.getByCode(Integer.parseInt(codecValue)); + int codec = Integer.parseInt(codecValue); + SerializerType serializerType = SerializerType.getByCode(codec); + rpcMsg.setCodec(serializerType.getCode()); Serializer serializer = SerializerServiceLoader.load(serializerType); Object messageBody = serializer.deserialize(bodyBytes); rpcMsg.setBody(messageBody); From e72ddf30d7aee82cda0d636b980ef12e85f4a9dc Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Wed, 2 Oct 2024 12:45:29 +0800 Subject: [PATCH 14/21] codec --- .../java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java index 24b2c6bd7c5..55be72fe180 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java @@ -23,6 +23,7 @@ import io.netty.handler.codec.http2.Http2HeadersFrame; import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; +import org.apache.commons.lang.StringUtils; import org.apache.seata.core.compressor.Compressor; import org.apache.seata.core.compressor.CompressorFactory; import org.apache.seata.core.protocol.HeartbeatMessage; From 37e94fd2654105b72db91471b459e0b6c4044a1e Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Wed, 2 Oct 2024 12:52:05 +0800 Subject: [PATCH 15/21] codec --- .../java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java index 55be72fe180..aebe99b55b0 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java @@ -86,7 +86,7 @@ public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg) throws Exc } else if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) { rpcMsg.setBody(HeartbeatMessage.PONG); } else { - String compressType = headMap.get(GrpcHeaderEnum.COMPRESS_TYPE.header);; + String compressType = headMap.get(GrpcHeaderEnum.COMPRESS_TYPE.header); if (StringUtils.isNotBlank(compressType)) { byte compress = Byte.parseByte(compressType); rpcMsg.setCompressor(compress); From a23ff8ba6b4ea6db413ef55fa94f9c6ac0aae7b8 Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Wed, 2 Oct 2024 13:29:50 +0800 Subject: [PATCH 16/21] codec --- .../core/rpc/netty/mockserver/MockGrpcServerTest.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/MockGrpcServerTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/MockGrpcServerTest.java index ce9c52dbd85..d0a7792b494 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/MockGrpcServerTest.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/MockGrpcServerTest.java @@ -92,16 +92,6 @@ public void testRollbackRetry() throws TransactionException { Assertions.assertEquals(3, Action1Impl.getRollbackTimes(xid)); } - @Test - public void testTm() throws Exception { - TmClientTest.testTm(); - } - - @Test - public void testRm() throws Exception { - RmClientTest.testRm(); - } - private String doTestCommit(int times) throws TransactionException { TransactionManager tm = TmClientTest.getTm(); DefaultResourceManager rm = RmClientTest.getRm(RESOURCE_ID); From 6e51a7db52f2f78eb8303235665dc237cdc1e5a1 Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Wed, 2 Oct 2024 13:45:15 +0800 Subject: [PATCH 17/21] codec --- console/pom.xml | 77 ------------------- .../netty/mockserver/MockGrpcServerTest.java | 1 - 2 files changed, 78 deletions(-) diff --git a/console/pom.xml b/console/pom.xml index 16fba6a55ee..f5a5b93012c 100644 --- a/console/pom.xml +++ b/console/pom.xml @@ -166,81 +166,4 @@ ${project.version} - - - - - com.github.eirslett - frontend-maven-plugin - - src/main/resources/static/console-fe - - - - install node and npm - - install-node-and-npm - - generate-resources - - v19.5.0 - - - - npm install - - npm - - generate-resources - - install - - - - npm build - - npm - - generate-resources - - run build - - ${project.version} - - - - - - - org.apache.maven.plugins - maven-resources-plugin - - - copy-resources-static - generate-resources - - copy-resources - - - src/main/resources/static - - - src/main/resources/static/console-fe/dist - - - - - - - - - - src/main/resources - - **/node_modules/** - static/console-fe - - - - diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/MockGrpcServerTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/MockGrpcServerTest.java index d0a7792b494..3744ddd270f 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/MockGrpcServerTest.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/MockGrpcServerTest.java @@ -102,7 +102,6 @@ private String doTestCommit(int times) throws TransactionException { GlobalStatus commit = tm.commit(xid); Assertions.assertEquals(GlobalStatus.Committed, commit); return xid; - } private String doTestRollback(int times) throws TransactionException { From 60221df8c838442715b7b87fb6ed24917ff0f195 Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Wed, 2 Oct 2024 13:46:24 +0800 Subject: [PATCH 18/21] codec --- console/pom.xml | 77 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/console/pom.xml b/console/pom.xml index f5a5b93012c..16fba6a55ee 100644 --- a/console/pom.xml +++ b/console/pom.xml @@ -166,4 +166,81 @@ ${project.version} + + + + + com.github.eirslett + frontend-maven-plugin + + src/main/resources/static/console-fe + + + + install node and npm + + install-node-and-npm + + generate-resources + + v19.5.0 + + + + npm install + + npm + + generate-resources + + install + + + + npm build + + npm + + generate-resources + + run build + + ${project.version} + + + + + + + org.apache.maven.plugins + maven-resources-plugin + + + copy-resources-static + generate-resources + + copy-resources + + + src/main/resources/static + + + src/main/resources/static/console-fe/dist + + + + + + + + + + src/main/resources + + **/node_modules/** + static/console-fe + + + + From 18a7d0ed0e9e72e0be0564ea2ab0a8fe19169510 Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Wed, 2 Oct 2024 23:52:18 +0800 Subject: [PATCH 19/21] codec --- .../apache/seata/core/rpc/netty/grpc/GrpcEncoder.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java index d5eea5caff1..dbbbfe1be48 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java @@ -61,11 +61,15 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers)); } - ByteString dataBytes = ByteString.EMPTY; + ByteString dataBytes; if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST && messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) { Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(SerializerType.PROTOBUF.getCode())); - dataBytes = ByteString.copyFrom(serializer.serialize(body)); + byte[] serializedBytes = serializer.serialize(body); + Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor()); + dataBytes = ByteString.copyFrom(compressor.compress(serializedBytes)); + } else { + dataBytes = ByteString.EMPTY; } headMap.put(GrpcHeaderEnum.CODEC_TYPE.header, String.valueOf(SerializerType.PROTOBUF.getCode())); headMap.put(GrpcHeaderEnum.COMPRESS_TYPE.header, String.valueOf(rpcMessage.getCompressor())); @@ -73,8 +77,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) .putAllHeadMap(headMap) .setMessageType(messageType) .setId(id); - Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor()); - builder.setBody(ByteString.copyFrom(compressor.compress(dataBytes.toByteArray()))); + builder.setBody(ByteString.copyFrom(dataBytes.toByteArray())); GrpcMessageProto grpcMessageProto = builder.build(); byte[] bodyBytes = grpcMessageProto.toByteArray(); From 08a0492bdb1d1a6eb1c1da872f05b2e207318a94 Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Thu, 3 Oct 2024 22:02:32 +0800 Subject: [PATCH 20/21] codec --- .../apache/seata/core/rpc/netty/grpc/GrpcDecoder.java | 9 --------- .../org/apache/seata/core/serializer/SerializerType.java | 7 ------- 2 files changed, 16 deletions(-) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java index aebe99b55b0..71c9caf8be9 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java @@ -115,13 +115,4 @@ public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg) throws Exc public void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame headersFrame) throws Exception { // TODO Subsequent decompression logic is possible } - - private int safeCastToInt(long value) { - if (value >= Integer.MIN_VALUE && value <= Integer.MAX_VALUE) { - return (int) value; - } else { - throw new IllegalArgumentException("Value exceeds int range: " + value); - } - } - } diff --git a/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java b/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java index 8c5d21f68f5..c60067e72bc 100644 --- a/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java +++ b/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java @@ -63,13 +63,6 @@ public enum SerializerType { * Math.pow(2, 5) */ JACKSON((byte)0x32), - - /** - * The grpc. - *

- * Math.pow(2, 6) - */ - GRPC((byte)0x64) ; private final byte code; From ed431522dcf3e33518bcfe82aaf2b5064d4f4748 Mon Sep 17 00:00:00 2001 From: yiqi <1455432762@qq.com> Date: Fri, 4 Oct 2024 23:06:51 +0800 Subject: [PATCH 21/21] codec --- changes/en-us/2.x.md | 2 ++ changes/zh-cn/2.x.md | 3 ++- script/client/conf/file.conf | 2 ++ script/client/spring/application.properties | 1 + script/client/spring/application.yml | 1 + script/config-center/config.txt | 1 + 6 files changed, 9 insertions(+), 1 deletion(-) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index fcdb6c6523a..2dd147c01b5 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -5,6 +5,7 @@ Add changes here for all PR submitted to the 2.x branch. ### feature: - [[#6876](https://github.com/apache/incubator-seata/pull/6876)]support kingbase +- [[#6881](https://github.com/apache/incubator-seata/pull/6881)]support grpc ### bugfix: @@ -35,6 +36,7 @@ Thanks to these contributors for their code commits. Please report an unintended - [dk2k](https://github.com/dk2k) - [MaoMaoandSnail](https://github.com/MaoMaoandSnail) - [yougecn](https://github.com/yougecn) +- [PleaseGiveMeTheCoke](https://github.com/PleaseGiveMeTheCoke) diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 016c32c4651..624db67f904 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -4,6 +4,7 @@ ### feature: [[#6876](https://github.com/apache/incubator-seata/pull/6876)]支持人大金仓数据库(kingbase) +[[#6881](https://github.com/apache/incubator-seata/pull/6881)]全链路支持grpc ### bugfix: @@ -35,7 +36,7 @@ - [dk2k](https://github.com/dk2k) - [MaoMaoandSnail](https://github.com/MaoMaoandSnail) - [yougecn](https://github.com/yougecn) - +- [PleaseGiveMeTheCoke](https://github.com/PleaseGiveMeTheCoke) 同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。 diff --git a/script/client/conf/file.conf b/script/client/conf/file.conf index 92c836e17a6..4b121660922 100644 --- a/script/client/conf/file.conf +++ b/script/client/conf/file.conf @@ -16,6 +16,8 @@ # transport { + # communication protocols, seata or grpc, default seata + protocol = "seata" # tcp, unix-domain-socket type = "TCP" #NIO, NATIVE diff --git a/script/client/spring/application.properties b/script/client/spring/application.properties index cb7d93c2ab7..2a72d1e5f79 100755 --- a/script/client/spring/application.properties +++ b/script/client/spring/application.properties @@ -64,6 +64,7 @@ seata.log.exception-rate=100 seata.service.vgroup-mapping.default_tx_group=default seata.service.grouplist.default=127.0.0.1:8091 seata.service.disable-global-transaction=false +seata.transport.protocol=seata seata.transport.shutdown.wait=3 seata.transport.thread-factory.boss-thread-prefix=NettyBoss seata.transport.thread-factory.worker-thread-prefix=NettyServerNIOWorker diff --git a/script/client/spring/application.yml b/script/client/spring/application.yml index 9eef693d332..a6100f05740 100755 --- a/script/client/spring/application.yml +++ b/script/client/spring/application.yml @@ -73,6 +73,7 @@ seata: default: 127.0.0.1:8091 disable-global-transaction: false transport: + protocol: seata shutdown: wait: 3 thread-factory: diff --git a/script/config-center/config.txt b/script/config-center/config.txt index 99cd7bd1313..8cf986f3f94 100644 --- a/script/config-center/config.txt +++ b/script/config-center/config.txt @@ -17,6 +17,7 @@ #For details about configuration items, see https://seata.apache.org/zh-cn/docs/user/configurations #Transport configuration, for client and server +transport.protocol=seata transport.type=TCP transport.server=NIO transport.heartbeat=true