Skip to content

Commit

Permalink
support change grpc maxInboundMessageSize (#1333)
Browse files Browse the repository at this point in the history
Co-authored-by: liujianjun.ljj <liujianjun.ljj@antgroup.com>
  • Loading branch information
EvenLjj and liujianjun.ljj authored Jun 14, 2023
1 parent bf8a17d commit 3338957
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,10 @@ public class RpcOptions {
* 默认IO的buffer大小
*/
public static final String TRANSPORT_BUFFER_SIZE = "transport.buffer.size";
/**
* 默认 grpc maxInboundMessageSize大小
*/
public static final String TRANSPORT_GRPC_MAX_INBOUND_MESSAGE_SIZE = "transport.grpc.maxInboundMessageSize";
/**
* 最大IO的buffer大小
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ PS:大家也看到了,本JSON文档是支持注释的,而标准JSON是不支
"transport.use.epoll": false,
//默认数据包大小 8*1024*1024
"transport.payload.max": 8388608,
//默认grpc数据包大小 4*1024*1024
"transport.grpc.maxInboundMessageSize": 4194304,
// 客户端io线程数,默认 max(4,cpu+1)
"transport.client.io.threads": 0,
// 即I/O操作和用户自定义任务的执行时间比为1:1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package com.alipay.sofa.rpc.server.triple;

import com.alipay.sofa.rpc.common.RpcConfigs;
import com.alipay.sofa.rpc.common.RpcOptions;
import com.alipay.sofa.rpc.common.cache.ReflectCache;
import com.alipay.sofa.rpc.common.struct.NamedThreadFactory;
import com.alipay.sofa.rpc.config.ConfigUniqueNameGenerator;
Expand Down Expand Up @@ -138,6 +140,7 @@ public void init(ServerConfig serverConfig) {
.workerEventLoopGroup(constructWorkerEventLoopGroup())
.executor(bizThreadPool)
.channelType(constructChannel())
.maxInboundMessageSize(RpcConfigs.getIntValue(RpcOptions.TRANSPORT_GRPC_MAX_INBOUND_MESSAGE_SIZE))
.build();
this.lock = new ReentrantLock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.alipay.sofa.rpc.transport.triple;

import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.common.RpcConfigs;
import com.alipay.sofa.rpc.common.RpcOptions;
import com.alipay.sofa.rpc.common.utils.NetUtils;
import com.alipay.sofa.rpc.context.RpcInternalContext;
import com.alipay.sofa.rpc.context.RpcInvokeContext;
Expand Down Expand Up @@ -275,6 +277,7 @@ private ManagedChannel initChannel(ProviderInfo url) {
builder.usePlaintext();
builder.disableRetry();
builder.intercept(clientHeaderClientInterceptor);
builder.maxInboundMessageSize(RpcConfigs.getIntValue(RpcOptions.TRANSPORT_GRPC_MAX_INBOUND_MESSAGE_SIZE));
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ public interface SampleService {

String hello(String name);

String messageSize(String msg, int responseSize);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.test.triple;

/**
* @author Even
* @date 2023/6/13 11:33 AM
*/
public class SampleServiceImpl implements SampleService {

@Override
public String hello(String name) {
return "Hello! " + name;
}

@Override
public String messageSize(String msg, int responseMessageSize) {
if (responseMessageSize > 0) {
StringBuilder sb = new StringBuilder();
/* 1KB */
for (int i = 0; i < responseMessageSize * 1024; i++) {
sb.append('a');
}
return sb.toString();
}
return msg;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package com.alipay.sofa.rpc.test.triple;

import com.alipay.sofa.rpc.common.RpcConfigs;
import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.common.RpcOptions;
import com.alipay.sofa.rpc.config.ApplicationConfig;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.config.ProviderConfig;
Expand Down Expand Up @@ -182,6 +184,11 @@ public void testSyncSampleService() {
public String hello(String name) {
return "Hello! " + name;
}

@Override
public String messageSize(String msg, int responseSize) {
return "";
}
})
.setServer(serverConfig);

Expand Down Expand Up @@ -291,6 +298,95 @@ public void testExposeTwoUniqueId() {
Assert.assertEquals("Hello2 world", greeterBlockingStub2.sayHello(request).getMessage());
}

@Test
public void testDefaultMessageSize() {
int originInboundMessageSize = RpcConfigs.getIntValue(RpcOptions.TRANSPORT_GRPC_MAX_INBOUND_MESSAGE_SIZE);
Assert.assertEquals(4194304, originInboundMessageSize);

ApplicationConfig applicationConfig = new ApplicationConfig().setAppName("triple-server1");
int port = 50052;
ServerConfig serverConfig = new ServerConfig()
.setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE)
.setPort(port);
ProviderConfig<SampleService> providerConfig = new ProviderConfig<SampleService>()
.setApplication(applicationConfig)
.setBootstrap(RpcConstants.PROTOCOL_TYPE_TRIPLE)
.setInterfaceId(SampleService.class.getName())
.setRef(new SampleServiceImpl())
.setServer(serverConfig);
providerConfig.export();

ConsumerConfig<SampleService> consumerConfig = new ConsumerConfig<>();
consumerConfig.setInterfaceId(SampleService.class.getName())
.setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE)
.setDirectUrl("tri://127.0.0.1:" + port);

SampleService sampleService = consumerConfig.refer();
String msg = buildMsg(1);
try {
sampleService.messageSize(msg, 5*1024);
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("gRPC message exceeds maximum size 4194304:"));
}
msg = buildMsg(5*1024);

try {
sampleService.messageSize(msg, 1);
Assert.fail();
} catch (Exception e) {
// The client actively cancelled the request, resulting in the server returning a CANCELLED error.
Assert.assertTrue(e.getMessage().contains("CANCELLED: HTTP/2 error code: CANCEL"));
}
}

@Test
public void testSetInboundMessageSize() {
int originInboundMessageSize = RpcConfigs.getIntValue(RpcOptions.TRANSPORT_GRPC_MAX_INBOUND_MESSAGE_SIZE);
Assert.assertEquals(4194304, originInboundMessageSize);
RpcConfigs.putValue(RpcOptions.TRANSPORT_GRPC_MAX_INBOUND_MESSAGE_SIZE, "8388608");
try {
ApplicationConfig applicationConfig = new ApplicationConfig().setAppName("triple-server1");
int port = 50052;
ServerConfig serverConfig = new ServerConfig()
.setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE)
.setPort(port);

ProviderConfig<SampleService> providerConfig = new ProviderConfig<SampleService>()
.setApplication(applicationConfig)
.setUniqueId("maxInbound")
.setBootstrap(RpcConstants.PROTOCOL_TYPE_TRIPLE)
.setInterfaceId(SampleService.class.getName())
.setRef(new SampleServiceImpl())
.setServer(serverConfig);
providerConfig.export();

ConsumerConfig<SampleService> consumerConfig = new ConsumerConfig<>();
consumerConfig.setInterfaceId(SampleService.class.getName())
.setUniqueId("maxInbound")
.setProtocol(RpcConstants.PROTOCOL_TYPE_TRIPLE)
.setDirectUrl("tri://127.0.0.1:" + port);
SampleService sampleService2 = consumerConfig.refer();
String msg = buildMsg(5 * 1024);
try {
sampleService2.messageSize(msg, 5 * 1024);
} catch (Exception e) {
Assert.fail();
}
} finally {
RpcConfigs.putValue(RpcOptions.TRANSPORT_GRPC_MAX_INBOUND_MESSAGE_SIZE, originInboundMessageSize);
}
}

private String buildMsg(int messageSize) {
StringBuilder sb = new StringBuilder();
// 1KB
for (int i = 0; i < messageSize * 1024; i++) {
sb.append('a');
}
return sb.toString();
}

@BeforeClass
public static void adBeforeClass() {
RpcRunningState.setUnitTestMode(true);
Expand Down

0 comments on commit 3338957

Please sign in to comment.