diff --git a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java index dec76f51b7c..ff8436b6dc9 100644 --- a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java +++ b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java @@ -628,6 +628,8 @@ public interface ConfigurationKeys { @Deprecated String ENABLE_CLIENT_BATCH_SEND_REQUEST = TRANSPORT_PREFIX + "enableClientBatchSendRequest"; + String TRANSPORT_PROTOCOL = TRANSPORT_PREFIX + "protocol"; + /** * The constant ENABLE_TM_CLIENT_BATCH_SEND_REQUEST */ diff --git a/common/src/main/java/org/apache/seata/common/DefaultValues.java b/common/src/main/java/org/apache/seata/common/DefaultValues.java index 8c484a1ab0f..eb0d40bb308 100644 --- a/common/src/main/java/org/apache/seata/common/DefaultValues.java +++ b/common/src/main/java/org/apache/seata/common/DefaultValues.java @@ -63,6 +63,7 @@ public interface DefaultValues { String DEFAULT_BOSS_THREAD_PREFIX = "NettyBoss"; String DEFAULT_NIO_WORKER_THREAD_PREFIX = "NettyServerNIOWorker"; String DEFAULT_EXECUTOR_THREAD_PREFIX = "NettyServerBizHandler"; + String DEFAULT_PROTOCOL = "seata"; boolean DEFAULT_TRANSPORT_HEARTBEAT = true; boolean DEFAULT_TRANSACTION_UNDO_DATA_VALIDATION = true; diff --git a/core/pom.xml b/core/pom.xml index 9de6107bc03..26ce3dc9018 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -69,6 +69,10 @@ fastjson test + + com.google.protobuf + protobuf-java + @@ -90,6 +94,23 @@ + + org.xolstice.maven.plugins + protobuf-maven-plugin + + ${project.basedir}/src/main/resources/protobuf/org/apache/seata/protocol/transcation/ + + com.google.protobuf:protoc:3.25.4:exe:${os.detected.classifier} + + + + + + compile + + + + diff --git a/core/src/main/java/org/apache/seata/core/protocol/Protocol.java b/core/src/main/java/org/apache/seata/core/protocol/Protocol.java new file mode 100644 index 00000000000..fe3cc000cfc --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/protocol/Protocol.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.protocol; + +/** + * seata transport protocol + */ +public enum Protocol { + + /** + * grpc + */ + GPRC("grpc"), + + /** + * seata + */ + SEATA("seata"); + + public final String value; + + Protocol(String value) { + this.value = value; + } +} diff --git a/core/src/main/java/org/apache/seata/core/protocol/detector/Http2Detector.java b/core/src/main/java/org/apache/seata/core/protocol/detector/Http2Detector.java new file mode 100644 index 00000000000..a004894f9b1 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/protocol/detector/Http2Detector.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.protocol.detector; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.http2.Http2FrameCodecBuilder; +import io.netty.handler.codec.http2.Http2MultiplexHandler; +import io.netty.handler.codec.http2.Http2StreamChannel; +import io.netty.util.CharsetUtil; +import org.apache.seata.core.rpc.netty.grpc.GrpcDecoder; +import org.apache.seata.core.rpc.netty.grpc.GrpcEncoder; + +public class Http2Detector implements ProtocolDetector { + private static final byte[] HTTP2_PREFIX_BYTES = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes(CharsetUtil.UTF_8); + private ChannelHandler[] serverHandlers; + + public Http2Detector(ChannelHandler[] serverHandlers) { + this.serverHandlers = serverHandlers; + } + + @Override + public boolean detect(ByteBuf in) { + if (in.readableBytes() < HTTP2_PREFIX_BYTES.length) { + return false; + } + for (int i = 0; i < HTTP2_PREFIX_BYTES.length; i++) { + if (in.getByte(i) != HTTP2_PREFIX_BYTES[i]) { + return false; + } + } + return true; + } + + @Override + public ChannelHandler[] getHandlers() { + return new ChannelHandler[]{ + Http2FrameCodecBuilder.forServer().build(), + new Http2MultiplexHandler(new ChannelInitializer() { + @Override + protected void initChannel(Http2StreamChannel ch) { + final ChannelPipeline p = ch.pipeline(); + p.addLast(new GrpcDecoder()); + p.addLast(new GrpcEncoder()); + p.addLast(serverHandlers); + } + }) + }; + } +} \ No newline at end of file diff --git a/core/src/main/java/org/apache/seata/core/protocol/detector/ProtocolDetector.java b/core/src/main/java/org/apache/seata/core/protocol/detector/ProtocolDetector.java new file mode 100644 index 00000000000..89d5d10e7be --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/protocol/detector/ProtocolDetector.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.protocol.detector; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; + +public interface ProtocolDetector { + boolean detect(ByteBuf in); + + ChannelHandler[] getHandlers(); +} diff --git a/core/src/main/java/org/apache/seata/core/protocol/detector/SeataDetector.java b/core/src/main/java/org/apache/seata/core/protocol/detector/SeataDetector.java new file mode 100644 index 00000000000..b9c30b0bc6d --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/protocol/detector/SeataDetector.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.protocol.detector; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; +import org.apache.seata.core.rpc.netty.MultiProtocolDecoder; + +public class SeataDetector implements ProtocolDetector { + private static final byte[] MAGIC_CODE_BYTES = {(byte) 0xda, (byte) 0xda}; + private ChannelHandler[] serverHandlers; + + public SeataDetector(ChannelHandler[] serverHandlers) { + this.serverHandlers = serverHandlers; + } + + @Override + public boolean detect(ByteBuf in) { + if (in.readableBytes() < MAGIC_CODE_BYTES.length) { + return false; + } + for (int i = 0; i < MAGIC_CODE_BYTES.length; i++) { + if (in.getByte(i) != MAGIC_CODE_BYTES[i]) { + return false; + } + } + return true; + } + + @Override + public ChannelHandler[] getHandlers() { + MultiProtocolDecoder multiProtocolDecoder = new MultiProtocolDecoder(serverHandlers); + + return new ChannelHandler[]{multiProtocolDecoder}; + } +} \ No newline at end of file diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java index 4aaafc0acb0..68ee02d71c0 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java @@ -18,8 +18,11 @@ import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; @@ -28,13 +31,19 @@ import io.netty.channel.epoll.EpollMode; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http2.Http2FrameCodecBuilder; +import io.netty.handler.codec.http2.Http2MultiplexHandler; +import io.netty.handler.codec.http2.Http2StreamChannelBootstrap; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.internal.PlatformDependent; import org.apache.seata.common.exception.FrameworkException; import org.apache.seata.common.thread.NamedThreadFactory; +import org.apache.seata.core.protocol.Protocol; import org.apache.seata.core.rpc.RemotingBootstrap; +import org.apache.seata.core.rpc.netty.grpc.GrpcDecoder; +import org.apache.seata.core.rpc.netty.grpc.GrpcEncoder; import org.apache.seata.core.rpc.netty.v1.ProtocolDecoderV1; import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1; import org.slf4j.Logger; @@ -130,12 +139,19 @@ public void start() { @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); - pipeline - .addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(), + pipeline.addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(), nettyClientConfig.getChannelMaxWriteIdleSeconds(), - nettyClientConfig.getChannelMaxAllIdleSeconds())) - .addLast(new ProtocolDecoderV1()) - .addLast(new ProtocolEncoderV1()); + nettyClientConfig.getChannelMaxAllIdleSeconds())); + if (nettyClientConfig.getProtocol().equals(Protocol.GPRC.value)) { + pipeline.addLast(Http2FrameCodecBuilder.forClient().build()) + .addLast(new Http2MultiplexHandler(new ChannelDuplexHandler())) + .addLast(new GrpcDecoder()) + .addLast(new GrpcEncoder()); + } else { + pipeline.addLast(new ProtocolDecoderV1()) + .addLast(new ProtocolEncoderV1()); + } + if (channelHandlers != null) { addChannelPipelineLast(ch, channelHandlers); } @@ -177,9 +193,28 @@ public Channel getNewChannel(InetSocketAddress address) { } else { channel = f.channel(); } + + // TODO tmp only for grpc + if (nettyClientConfig.getProtocol().equals(Protocol.GPRC.value)) { + Http2StreamChannelBootstrap bootstrap = new Http2StreamChannelBootstrap(channel); + bootstrap.handler(new ChannelInboundHandlerAdapter() { + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + channel.pipeline().addLast(new GrpcDecoder()); + channel.pipeline().addLast(new GrpcEncoder()); + if (channelHandlers != null) { + addChannelPipelineLast(channel, channelHandlers); + } + } + }); + channel = bootstrap.open().get(); + } + } catch (Exception e) { throw new FrameworkException(e, "can not connect to services-server."); } + return channel; } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientConfig.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientConfig.java index f0e047ad58d..68583608262 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientConfig.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientConfig.java @@ -21,6 +21,7 @@ import org.apache.seata.core.rpc.TransportServerType; import static org.apache.seata.common.DefaultValues.DEFAULT_ENABLE_CLIENT_BATCH_SEND_REQUEST; +import static org.apache.seata.common.DefaultValues.DEFAULT_PROTOCOL; import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_RM_REQUEST_TIMEOUT; import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_TM_REQUEST_TIMEOUT; import static org.apache.seata.common.DefaultValues.DEFAULT_SELECTOR_THREAD_PREFIX; @@ -451,6 +452,10 @@ public String getRmDispatchThreadPrefix() { return RPC_DISPATCH_THREAD_PREFIX + "_" + NettyPoolKey.TransactionRole.RMROLE.name(); } + public String getProtocol() { + return CONFIG.getConfig(org.apache.seata.common.ConfigurationKeys.TRANSPORT_PROTOCOL, DEFAULT_PROTOCOL); + } + @Deprecated public static boolean isEnableClientBatchSendRequest() { return ENABLE_CLIENT_BATCH_SEND_REQUEST; diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java index c7b2aa57c21..b589396e5ab 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java @@ -92,6 +92,10 @@ protected void setChannelHandlers(final ChannelHandler... handlers) { } } + protected ChannelHandler[] getChannelHandlers() { + return channelHandlers; + } + /** * Add channel pipeline last. * @@ -158,10 +162,8 @@ public void start() { .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) { - MultiProtocolDecoder multiProtocolDecoder = new MultiProtocolDecoder(channelHandlers); - ch.pipeline() - .addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0)) - .addLast(multiProtocolDecoder); + ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0)) + .addLast(new ProtocolDetectHandler(NettyServerBootstrap.this)); } }); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDetectHandler.java b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDetectHandler.java new file mode 100644 index 00000000000..9f1b5f8c113 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDetectHandler.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import org.apache.seata.core.protocol.detector.Http2Detector; +import org.apache.seata.core.protocol.detector.ProtocolDetector; +import org.apache.seata.core.protocol.detector.SeataDetector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class ProtocolDetectHandler extends ByteToMessageDecoder { + private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolDetectHandler.class); + private NettyServerBootstrap nettyServerBootstrap; + private ProtocolDetector[] supportedProtocolDetectors; + + public ProtocolDetectHandler(NettyServerBootstrap nettyServerBootstrap) { + this.nettyServerBootstrap = nettyServerBootstrap; + this.supportedProtocolDetectors = new ProtocolDetector[]{new Http2Detector(nettyServerBootstrap.getChannelHandlers()), new SeataDetector(nettyServerBootstrap.getChannelHandlers())}; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + for (ProtocolDetector protocolDetector : supportedProtocolDetectors) { + if (protocolDetector.detect(in)) { + ChannelHandler[] protocolHandlers = protocolDetector.getHandlers(); + ctx.pipeline().addLast(protocolHandlers); + ctx.pipeline().remove(this); + + in.resetReaderIndex(); + return; + } + + in.resetReaderIndex(); + } + + byte[] preface = new byte[in.readableBytes()]; + in.readBytes(preface); + LOGGER.error("Can not recognize protocol from remote {}, preface = {}", ctx.channel().remoteAddress(), preface); + in.clear(); + ctx.close(); + } +} \ No newline at end of file diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java new file mode 100644 index 00000000000..bcde5702eb3 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcDecoder.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.grpc; + +import com.google.protobuf.Any; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http2.Http2DataFrame; +import io.netty.handler.codec.http2.Http2HeadersFrame; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.ReferenceCounted; +import org.apache.seata.core.protocol.RpcMessage; +import org.apache.seata.core.protocol.generated.GrpcMessageProto; +import org.apache.seata.core.serializer.Serializer; +import org.apache.seata.core.serializer.SerializerServiceLoader; +import org.apache.seata.core.serializer.SerializerType; + +public class GrpcDecoder extends ChannelDuplexHandler { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof Http2HeadersFrame) { + onHeadersRead(ctx, (Http2HeadersFrame) msg); + } else if (msg instanceof Http2DataFrame) { + onDataRead(ctx, (Http2DataFrame) msg); + } else if (msg instanceof ReferenceCounted) { + ReferenceCountUtil.release(msg); + } + } + + public void onDataRead(ChannelHandlerContext ctx, Http2DataFrame msg) throws Exception { + ByteBuf content = msg.content(); + try { + int readableBytes = content.readableBytes(); + byte[] bytes = new byte[readableBytes]; + content.readBytes(bytes); + if (bytes.length < 5) { + return; + } + + int srcPos = 0; + while (srcPos < readableBytes) { + // The first byte defaults to 0, indicating that no decompression is required + // Read the value of the next four bytes as the length of the body + int length = ((bytes[srcPos + 1] & 0xFF) << 24) | ((bytes[srcPos + 2] & 0xFF) << 16) + | ((bytes[srcPos + 3] & 0xFF) << 8) | (bytes[srcPos + 4] & 0xFF); + + byte[] data = new byte[length]; + System.arraycopy(bytes, srcPos + 5, data, 0, length); + + GrpcMessageProto grpcMessageProto = GrpcMessageProto.parseFrom(data); + Any body = grpcMessageProto.getBody(); + int messageType = safeCastToInt(grpcMessageProto.getMessageType()); + int messageId = safeCastToInt(grpcMessageProto.getId()); + byte[] byteArray = body.toByteArray(); + + Serializer serializer = + SerializerServiceLoader.load(SerializerType.getByCode(SerializerType.GRPC.getCode()), (byte)0); + Object messageBody = serializer.deserialize(byteArray); + + RpcMessage rpcMsg = new RpcMessage(); + rpcMsg.setMessageType((byte)messageType); + rpcMsg.setBody(messageBody); + rpcMsg.setId(messageId); + rpcMsg.setHeadMap(grpcMessageProto.getHeadMapMap()); + + ctx.fireChannelRead(rpcMsg); + + srcPos += length + 5; + } + } finally { + ReferenceCountUtil.release(content); + } + } + + + public void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame headersFrame) throws Exception { + // TODO Subsequent decompression logic is possible + } + + private int safeCastToInt(long value) { + if (value >= Integer.MIN_VALUE && value <= Integer.MAX_VALUE) { + return (int)value; + } else { + throw new IllegalArgumentException("Value exceeds int range: " + value); + } + } + +} diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java new file mode 100644 index 00000000000..523f057dc67 --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcEncoder.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.grpc; + +import com.google.protobuf.Any; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http2.DefaultHttp2DataFrame; +import io.netty.handler.codec.http2.DefaultHttp2Headers; +import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame; +import io.netty.handler.codec.http2.Http2Headers; +import org.apache.seata.core.protocol.RpcMessage; +import org.apache.seata.core.protocol.generated.GrpcMessageProto; +import org.apache.seata.core.serializer.Serializer; +import org.apache.seata.core.serializer.SerializerServiceLoader; +import org.apache.seata.core.serializer.SerializerType; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class GrpcEncoder extends ChannelOutboundHandlerAdapter { + private final AtomicBoolean headerSent = new AtomicBoolean(false); + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (!(msg instanceof RpcMessage)) { + throw new UnsupportedOperationException("GrpcEncoder not support class:" + msg.getClass()); + } + + RpcMessage rpcMessage = (RpcMessage) msg; + byte messageType = rpcMessage.getMessageType(); + Map headMap = rpcMessage.getHeadMap(); + Object body = rpcMessage.getBody(); + int id = rpcMessage.getId(); + + if (headerSent.compareAndSet(false, true)) + { + Http2Headers headers = new DefaultHttp2Headers(); + headers.add(GrpcHeaderEnum.HTTP2_STATUS.header, String.valueOf(200)); + headers.add(GrpcHeaderEnum.GRPC_STATUS.header, String.valueOf(0)); + headers.add(GrpcHeaderEnum.GRPC_CONTENT_TYPE.header, "application/grpc"); + ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers)); + } + + Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(SerializerType.GRPC.getCode()), (byte) 0); + Any messageBody = Any.parseFrom(serializer.serialize(body)); + GrpcMessageProto grpcMessageProto = GrpcMessageProto.newBuilder() + .setBody(messageBody) + .putAllHeadMap(headMap) + .setMessageType(messageType) + .setId(id).build(); + byte[] bodyBytes = grpcMessageProto.toByteArray(); + if (bodyBytes != null) + { + byte[] messageWithPrefix = new byte[bodyBytes.length + 5]; + // The first byte is 0, indicating no compression + messageWithPrefix[0] = 0; + ByteBuffer buffer = ByteBuffer.allocate(4); + buffer.putInt(bodyBytes.length); + byte[] lengthBytes = buffer.array(); + // The last four bytes indicate the length + System.arraycopy(lengthBytes, 0, messageWithPrefix, 1, 4); + // The remaining bytes are body + System.arraycopy(bodyBytes, 0, messageWithPrefix, 5, bodyBytes.length); + ctx.writeAndFlush(new DefaultHttp2DataFrame(Unpooled.wrappedBuffer(messageWithPrefix))); + } + } + +} diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcHeaderEnum.java b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcHeaderEnum.java new file mode 100644 index 00000000000..2a803cedbec --- /dev/null +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/grpc/GrpcHeaderEnum.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.grpc; + +public enum GrpcHeaderEnum { + + /** + * grpc status + */ + GRPC_STATUS("grpc-status"), + /** + * http2 status + */ + HTTP2_STATUS(":status"), + /** + * content-type + */ + GRPC_CONTENT_TYPE("content-type"); + + public final String header; + + GrpcHeaderEnum(String header) { + this.header = header; + } +} diff --git a/core/src/main/java/org/apache/seata/core/serializer/SerializerServiceLoader.java b/core/src/main/java/org/apache/seata/core/serializer/SerializerServiceLoader.java index 63ce440edd1..0aa9bd340e8 100644 --- a/core/src/main/java/org/apache/seata/core/serializer/SerializerServiceLoader.java +++ b/core/src/main/java/org/apache/seata/core/serializer/SerializerServiceLoader.java @@ -72,8 +72,7 @@ public static Serializer load(SerializerType type, byte version) throws Enhanced "Please manually reference 'org.apache.seata:seata-serializer-protobuf' dependency."); } - - String key = serialzerKey(type, version); + String key = serializerKey(type, version); Serializer serializer = SERIALIZER_MAP.get(key); if (serializer == null) { if (type == SerializerType.SEATA) { @@ -86,7 +85,30 @@ public static Serializer load(SerializerType type, byte version) throws Enhanced return serializer; } - private static String serialzerKey(SerializerType type, byte version) { + /** + * Load the service of {@link Serializer} + * + * @param type the serializer type + * @return the service of {@link Serializer} + * @throws EnhancedServiceNotFoundException the enhanced service not found exception + */ + public static Serializer load(SerializerType type) throws EnhancedServiceNotFoundException { + if (type == SerializerType.PROTOBUF && !CONTAINS_PROTOBUF_DEPENDENCY) { + throw new EnhancedServiceNotFoundException("The class '" + PROTOBUF_SERIALIZER_CLASS_NAME + "' not found. " + + "Please manually reference 'org.apache.seata:seata-serializer-protobuf' dependency."); + } + + String key = type.name(); + Serializer serializer = SERIALIZER_MAP.get(key); + if (serializer == null) { + serializer = EnhancedServiceLoader.load(Serializer.class, type.name()); + + SERIALIZER_MAP.put(key, serializer); + } + return serializer; + } + + private static String serializerKey(SerializerType type, byte version) { if (type == SerializerType.SEATA) { return type.name() + version; } diff --git a/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java b/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java index c60067e72bc..8c5d21f68f5 100644 --- a/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java +++ b/core/src/main/java/org/apache/seata/core/serializer/SerializerType.java @@ -63,6 +63,13 @@ public enum SerializerType { * Math.pow(2, 5) */ JACKSON((byte)0x32), + + /** + * The grpc. + *

+ * Math.pow(2, 6) + */ + GRPC((byte)0x64) ; private final byte code; diff --git a/core/src/main/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto b/core/src/main/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto new file mode 100644 index 00000000000..cdf1b52f4a8 --- /dev/null +++ b/core/src/main/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto3"; +package org.apache.seata.protocol.protobuf; +import "google/protobuf/any.proto"; +option java_multiple_files = true; +option java_outer_classname = "GrpcMessage"; +option java_package = "org.apache.seata.core.protocol.generated"; + +message GrpcMessageProto { + int32 id = 1; + int32 messageType = 2; + map headMap = 3; + google.protobuf.Any body = 4; +} + +service SeataService { + rpc sendRequest (stream GrpcMessageProto) returns (stream GrpcMessageProto); +} \ No newline at end of file diff --git a/dependencies/pom.xml b/dependencies/pom.xml index 4bf0450d4f3..15178f14166 100644 --- a/dependencies/pom.xml +++ b/dependencies/pom.xml @@ -76,7 +76,6 @@ 4.1.101.Final 4.0.3 1.6.7 - 3.25.4 1.66.0 5.4.0 @@ -623,6 +622,11 @@ grpc-core ${grpc.version} + + io.grpc + grpc-alts + ${grpc.version} + io.grpc grpc-api diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/TransportProperties.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/TransportProperties.java index 8b4caa4e8f6..64f38a385d0 100644 --- a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/TransportProperties.java +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/properties/TransportProperties.java @@ -23,6 +23,7 @@ import static org.apache.seata.common.DefaultValues.DEFAULT_ENABLE_RM_CLIENT_BATCH_SEND_REQUEST; import static org.apache.seata.common.DefaultValues.DEFAULT_ENABLE_TC_SERVER_BATCH_SEND_RESPONSE; import static org.apache.seata.common.DefaultValues.DEFAULT_ENABLE_TM_CLIENT_BATCH_SEND_REQUEST; +import static org.apache.seata.common.DefaultValues.DEFAULT_PROTOCOL; import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_RM_REQUEST_TIMEOUT; import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_TC_REQUEST_TIMEOUT; import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_TM_REQUEST_TIMEOUT; @@ -54,6 +55,8 @@ public class TransportProperties { */ private String compressor = "none"; + private String protocol = DEFAULT_PROTOCOL; + /** * enable client batch send request */ @@ -193,4 +196,12 @@ public long getRpcTcRequestTimeout() { public void setRpcTcRequestTimeout(long rpcTcRequestTimeout) { this.rpcTcRequestTimeout = rpcTcRequestTimeout; } + + public String getProtocol() { + return protocol; + } + + public void setProtocol(String protocol) { + this.protocol = protocol; + } } diff --git a/serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/GrpcSerializer.java b/serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/GrpcSerializer.java new file mode 100644 index 00000000000..2ef8eac784e --- /dev/null +++ b/serializer/seata-serializer-protobuf/src/main/java/org/apache/seata/serializer/protobuf/GrpcSerializer.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.serializer.protobuf; + +import com.google.protobuf.Any; +import com.google.protobuf.Message; +import org.apache.seata.common.exception.ShouldNeverHappenException; +import org.apache.seata.common.loader.LoadLevel; +import org.apache.seata.core.serializer.Serializer; +import org.apache.seata.serializer.protobuf.convertor.PbConvertor; +import org.apache.seata.serializer.protobuf.manager.ProtobufConvertManager; + +@LoadLevel(name = "GRPC") +public class GrpcSerializer implements Serializer { + @Override + public byte[] serialize(T t) { + PbConvertor pbConvertor = ProtobufConvertManager.getInstance() + .fetchConvertor(t.getClass().getName()); + Any grpcBody = Any.pack((Message) pbConvertor.convert2Proto(t)); + + return grpcBody.toByteArray(); + } + + @Override + public T deserialize(byte[] bytes) { + try { + Any body = Any.parseFrom(bytes); + final Class clazz = ProtobufConvertManager.getInstance().fetchProtoClass(getTypeNameFromTypeUrl(body.getTypeUrl())); + if (body.is(clazz)) { + Object ob = body.unpack(clazz); + PbConvertor pbConvertor = ProtobufConvertManager.getInstance().fetchReversedConvertor(clazz.getName()); + + return (T) pbConvertor.convert2Model(ob); + } + } catch (Throwable e) { + throw new ShouldNeverHappenException("GrpcSerializer deserialize error", e); + } + + return null; + } + + private String getTypeNameFromTypeUrl(String typeUri) { + int pos = typeUri.lastIndexOf('/'); + return pos == -1 ? "" : typeUri.substring(pos + 1); + } +} diff --git a/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer b/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer index 71098c53674..f6fbf709dea 100644 --- a/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer +++ b/serializer/seata-serializer-protobuf/src/main/resources/META-INF/services/org.apache.seata.core.serializer.Serializer @@ -14,4 +14,5 @@ # See the License for the specific language governing permissions and # limitations under the License. # -org.apache.seata.serializer.protobuf.ProtobufSerializer \ No newline at end of file +org.apache.seata.serializer.protobuf.ProtobufSerializer +org.apache.seata.serializer.protobuf.GrpcSerializer \ No newline at end of file diff --git a/test/pom.xml b/test/pom.xml index 91d4c9f60fb..d35f25bad5e 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -40,6 +40,28 @@ true + + org.xolstice.maven.plugins + protobuf-maven-plugin + + ${project.basedir}/src/test/resources/protobuf/org/apache/seata/protocol/transcation/ + + com.google.protobuf:protoc:3.25.4:exe:${os.detected.classifier} + + grpc-java + + io.grpc:protoc-gen-grpc-java:1.66.0:exe:${os.detected.classifier} + + + + + + compile + compile-custom + + + + @@ -49,6 +71,10 @@ seata-tm ${project.version} + + io.grpc + grpc-alts + diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java new file mode 100644 index 00000000000..581a0e15f2d --- /dev/null +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/GrpcTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty.mockserver; + +import com.google.protobuf.Any; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; +import org.apache.seata.common.ConfigurationKeys; +import org.apache.seata.common.ConfigurationTestHelper; +import org.apache.seata.config.ConfigurationFactory; +import org.apache.seata.core.protocol.generated.GrpcMessageProto; +import org.apache.seata.core.rpc.netty.RmNettyRemotingClient; +import org.apache.seata.core.rpc.netty.TmNettyRemotingClient; +import org.apache.seata.mockserver.MockServer; +import org.apache.seata.serializer.protobuf.generated.*; +import org.apache.seata.core.protocol.generated.SeataServiceGrpc; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class GrpcTest { + + private static ManagedChannel channel; + + private static SeataServiceGrpc.SeataServiceStub seataServiceStub; + + @BeforeAll + public static void before() { + ConfigurationFactory.reload(); + ConfigurationTestHelper.putConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL, String.valueOf(ProtocolTestConstants.MOCK_SERVER_PORT)); + MockServer.start(ProtocolTestConstants.MOCK_SERVER_PORT); + TmNettyRemotingClient.getInstance().destroy(); + RmNettyRemotingClient.getInstance().destroy(); + + channel = ManagedChannelBuilder.forAddress("127.0.0.1", ProtocolTestConstants.MOCK_SERVER_PORT).usePlaintext().build(); + seataServiceStub = SeataServiceGrpc.newStub(channel); + } + + @AfterAll + public static void after() { + //MockServer.close(); + ConfigurationTestHelper.removeConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL); + TmNettyRemotingClient.getInstance().destroy(); + RmNettyRemotingClient.getInstance().destroy(); + } + + private GrpcMessageProto getRegisterTMRequest() { + AbstractIdentifyRequestProto abstractIdentifyRequestProto = AbstractIdentifyRequestProto.newBuilder() + .setApplicationId("test-applicationId") + .build(); + RegisterTMRequestProto registerTMRequestProto = RegisterTMRequestProto.newBuilder() + .setAbstractIdentifyRequest(abstractIdentifyRequestProto) + .build(); + + return GrpcMessageProto.newBuilder().setBody(Any.pack(registerTMRequestProto)).build(); + } + + private GrpcMessageProto getGlobalBeginRequest() { + GlobalBeginRequestProto globalBeginRequestProto = GlobalBeginRequestProto.newBuilder() + .setTransactionName("test-transaction") + .setTimeout(2000) + .build(); + return GrpcMessageProto.newBuilder().setBody(Any.pack(globalBeginRequestProto)).build(); + } + + private GrpcMessageProto getBranchRegisterRequest() { + BranchRegisterRequestProto branchRegisterRequestProto = BranchRegisterRequestProto.newBuilder() + .setXid("1") + .setLockKey("1") + .setResourceId("test-resource") + .setBranchType(BranchTypeProto.TCC) + .setApplicationData("{\"mock\":\"mock\"}") + .build(); + + return GrpcMessageProto.newBuilder().setBody(Any.pack(branchRegisterRequestProto)).build(); + } + + private GrpcMessageProto getGlobalCommitRequest() { + AbstractGlobalEndRequestProto globalEndRequestProto = AbstractGlobalEndRequestProto.newBuilder() + .setXid("1") + .build(); + GlobalCommitRequestProto globalCommitRequestProto = GlobalCommitRequestProto.newBuilder() + .setAbstractGlobalEndRequest(globalEndRequestProto) + .build(); + + return GrpcMessageProto.newBuilder().setBody(Any.pack(globalCommitRequestProto)).build(); + } + + private GrpcMessageProto getGlobalRollbackRequest() { + AbstractGlobalEndRequestProto globalEndRequestProto = AbstractGlobalEndRequestProto.newBuilder() + .setXid("1") + .build(); + GlobalRollbackRequestProto globalRollbackRequestProto = GlobalRollbackRequestProto.newBuilder() + .setAbstractGlobalEndRequest(globalEndRequestProto) + .build(); + + return GrpcMessageProto.newBuilder().setBody(Any.pack(globalRollbackRequestProto)).build(); + } + + @Test + public void testCommit() throws InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(4); + StreamObserver streamObserver = new StreamObserver() { + @Override + public void onNext(GrpcMessageProto grpcMessageProto) { + System.out.println("receive : " + grpcMessageProto.toString()); + countDownLatch.countDown(); + } + + @Override + public void onError(Throwable throwable) { + throwable.printStackTrace(); + } + + @Override + public void onCompleted() { + + } + }; + + StreamObserver response = seataServiceStub.sendRequest(streamObserver); + response.onNext(getRegisterTMRequest()); + response.onNext(getGlobalBeginRequest()); + response.onNext(getBranchRegisterRequest()); + response.onNext(getGlobalCommitRequest()); + + response.onCompleted(); + + countDownLatch.await(10, TimeUnit.SECONDS); + } + + @Test + public void testRollback() throws InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(4); + StreamObserver streamObserver = new StreamObserver() { + @Override + public void onNext(GrpcMessageProto grpcMessageProto) { + System.out.println("receive : " + grpcMessageProto.toString()); + countDownLatch.countDown(); + } + + @Override + public void onError(Throwable throwable) { + throwable.printStackTrace(); + } + + @Override + public void onCompleted() { + + } + }; + + StreamObserver response = seataServiceStub.sendRequest(streamObserver); + response.onNext(getRegisterTMRequest()); + response.onNext(getGlobalBeginRequest()); + response.onNext(getBranchRegisterRequest()); + response.onNext(getGlobalRollbackRequest()); + + response.onCompleted(); + + countDownLatch.await(10, TimeUnit.SECONDS); + } +} diff --git a/test/src/test/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto b/test/src/test/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto new file mode 100644 index 00000000000..cdf1b52f4a8 --- /dev/null +++ b/test/src/test/resources/protobuf/org/apache/seata/protocol/transcation/grpcMessage.proto @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto3"; +package org.apache.seata.protocol.protobuf; +import "google/protobuf/any.proto"; +option java_multiple_files = true; +option java_outer_classname = "GrpcMessage"; +option java_package = "org.apache.seata.core.protocol.generated"; + +message GrpcMessageProto { + int32 id = 1; + int32 messageType = 2; + map headMap = 3; + google.protobuf.Any body = 4; +} + +service SeataService { + rpc sendRequest (stream GrpcMessageProto) returns (stream GrpcMessageProto); +} \ No newline at end of file