Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
"ipc.maximum.data.length";
/** Default value for IPC_MAXIMUM_DATA_LENGTH. */
public static final int IPC_MAXIMUM_DATA_LENGTH_DEFAULT = 128 * 1024 * 1024;
/** Maximum number of stacked calls for one connection. **/
public static final String IPC_CONNECTION_MAXIMUM_STACKED_CALL =
"ipc.connection.maximum.stacked.call";
public static final long IPC_CONNECTION_MAXIMUM_STACKED_CALL_DEFAULT = 0;

/** Max response size a client will accept. */
public static final String IPC_MAXIMUM_RESPONSE_LENGTH =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ private class Connection extends Thread {
private Socket socket = null; // connected socket
private IpcStreams ipcStreams;
private final int maxResponseLength;
private final long maxStackedCallNumber;
private final int rpcTimeout;
private int maxIdleTime; //connections will be culled if it was idle for
//maxIdleTime msecs
Expand Down Expand Up @@ -460,6 +461,17 @@ private class Connection extends Thread {
this.maxResponseLength = remoteId.conf.getInt(
CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH,
CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT);
long tmpMaxStackedCallNumber = remoteId.conf.getLong(
CommonConfigurationKeys.IPC_CONNECTION_MAXIMUM_STACKED_CALL,
CommonConfigurationKeys.IPC_CONNECTION_MAXIMUM_STACKED_CALL_DEFAULT);
if (tmpMaxStackedCallNumber < 0) {
LOG.warn("Invalid value {} configured for {} should be greater than or equal to 0. " +
"Using default value of : {} instead.", tmpMaxStackedCallNumber,
CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH,
CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT);
tmpMaxStackedCallNumber = 0;
}
this.maxStackedCallNumber = tmpMaxStackedCallNumber;
this.rpcTimeout = remoteId.getRpcTimeout();
this.maxIdleTime = remoteId.getMaxIdleTime();
this.connectionRetryPolicy = remoteId.connectionRetryPolicy;
Expand Down Expand Up @@ -524,9 +536,14 @@ private void touch() {
* @param call to add
* @return true if the call was added.
*/
private synchronized boolean addCall(Call call) {
if (shouldCloseConnection.get())
private synchronized boolean addCall(Call call) throws IOException {
if (shouldCloseConnection.get()) {
return false;
}
if (this.maxStackedCallNumber > 0 && calls.size() >= this.maxStackedCallNumber) {
throw new IOException("Reject this request " + call.id + " due to stacked call number "
+ calls.size() + " is more than " + this.maxStackedCallNumber);
}
calls.put(call.id, call);
notify();
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2614,6 +2614,15 @@ The switch to turn S3A auditing on or off.
</description>
</property>

<property>
<name>ipc.connection.maximum.stacked.call</name>
<value>0</value>
<description>This indicates the maximum number of call that can stacked in
one connection of the client. It is used to avoid possible OOMs in Server.
This setting should rarely need to be changed. Set to 0 to disable.
</description>
</property>

<!-- Proxy Configuration -->

<property>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1645,6 +1645,43 @@ public void testRpcResponseLimit() throws Throwable {
Assert.fail("didn't get limit exceeded");
}

@Test
public void testRpcLimitStackedCall() throws Throwable {
Server server = new TestServer(1, true);
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
final Configuration copyConf = new Configuration(conf);
copyConf.setInt(CommonConfigurationKeys.IPC_CONNECTION_MAXIMUM_STACKED_CALL, 1);
final AtomicInteger callReturned = new AtomicInteger(0);
final AtomicInteger callException = new AtomicInteger(0);

try (Client client = new Client(LongWritable.class, copyConf)) {
Thread[] threads = new Thread[2];
for (int i = 0; i < 2; i++) {
threads[i] = new Thread(new Runnable(){
@Override
public void run() {
try {
call(client, Thread.currentThread().getId(), addr, copyConf);
callReturned.incrementAndGet();
} catch (IOException e) {
LOG.error(e.toString());
callException.incrementAndGet();
}
}
});
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
}
assertEquals(1, callReturned.get());
assertEquals(1, callException.get());
}

@Test
public void testUserBinding() throws Exception {
checkUserBinding(false);
Expand Down