Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,21 @@ public interface FederationRPCMBean {
*/
int getRpcClientNumActiveConnections();

/**
* Get the number of idle RPC connections between the Router and the NNs.
* @return Number of idle RPC connections between the Router and the NNs.
*/
int getRpcClientNumIdleConnections();

/**
* Get the number of recently active RPC connections between
* the Router and the NNs.
*
* @return Number of recently active RPC connections between
* the Router and the NNs.
*/
int getRpcClientNumActiveConnectionsRecently();

/**
* Get the number of RPC connections to be created.
* @return Number of RPC connections to be created.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,16 @@ public int getRpcClientNumActiveConnections() {
return rpcServer.getRPCClient().getNumActiveConnections();
}

@Override
public int getRpcClientNumIdleConnections() {
return rpcServer.getRPCClient().getNumIdleConnections();
}

@Override
public int getRpcClientNumActiveConnectionsRecently() {
return rpcServer.getRPCClient().getNumActiveConnectionsRecently();
}

@Override
public int getRpcClientNumCreatingConnections() {
return rpcServer.getRPCClient().getNumCreatingConnections();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
package org.apache.hadoop.hdfs.server.federation.router;

import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Context to track a connection in a {@link ConnectionPool}. When a client uses
Expand All @@ -36,13 +40,19 @@
*/
public class ConnectionContext {

private static final Logger LOG =
LoggerFactory.getLogger(ConnectionContext.class);

/** Client for the connection. */
private final ProxyAndInfo<?> client;
/** How many threads are using this connection. */
private int numThreads = 0;
/** If the connection is closed. */
private boolean closed = false;

/** Last timestamp the connection was active. */
private long lastActiveTs = 0;
/** The connection's active status would expire after this window. */
private final static long ACTIVE_WINDOW_TIME = TimeUnit.SECONDS.toMillis(30);

public ConnectionContext(ProxyAndInfo<?> connection) {
this.client = connection;
Expand All @@ -57,6 +67,16 @@ public synchronized boolean isActive() {
return this.numThreads > 0;
}

/**
* Check if the connection is/was active recently.
*
* @return True if the connection is active or
* was active in the past period of time.
*/
public synchronized boolean isActiveRecently() {
return Time.monotonicNow() - this.lastActiveTs <= ACTIVE_WINDOW_TIME;
}

/**
* Check if the connection is closed.
*
Expand All @@ -83,30 +103,41 @@ public synchronized boolean isUsable() {
*/
public synchronized ProxyAndInfo<?> getClient() {
this.numThreads++;
this.lastActiveTs = Time.monotonicNow();
return this.client;
}

/**
* Release this connection. If the connection was closed, close the proxy.
* Otherwise, mark the connection as not used by us anymore.
* Release this connection.
*/
public synchronized void release() {
if (--this.numThreads == 0 && this.closed) {
close();
if (this.numThreads > 0) {
this.numThreads--;
}
}

/**
* We will not use this connection anymore. If it's not being used, we close
* it. Otherwise, we let release() do it once we are done with it.
* Close a connection. Only idle connections can be closed since
* the RPC proxy would be shut down immediately.
*
* @param force whether the connection should be closed anyway.
*/
public synchronized void close() {
this.closed = true;
if (this.numThreads == 0) {
Object proxy = this.client.getProxy();
// Nobody should be using this anymore so it should close right away
RPC.stopProxy(proxy);
public synchronized void close(boolean force) {
if (!force && this.numThreads > 0) {
// this is an erroneous case but we have to close the connection
// anyway since there will be connection leak if we don't do so
// the connection has been moved out of the pool
LOG.error("Active connection with {} handlers will be closed",
this.numThreads);
}
this.closed = true;
Object proxy = this.client.getProxy();
// Nobody should be using this anymore so it should close right away
RPC.stopProxy(proxy);
}

public synchronized void close() {
close(false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,42 @@ public int getNumActiveConnections() {
return total;
}

/**
* Get number of idle connections.
*
* @return Number of active connections.
*/
public int getNumIdleConnections() {
int total = 0;
readLock.lock();
try {
for (ConnectionPool pool : this.pools.values()) {
total += pool.getNumIdleConnections();
}
} finally {
readLock.unlock();
}
return total;
}

/**
* Get number of recently active connections.
*
* @return Number of recently active connections.
*/
public int getNumActiveConnectionsRecently() {
int total = 0;
readLock.lock();
try {
for (ConnectionPool pool : this.pools.values()) {
total += pool.getNumActiveConnectionsRecently();
}
} finally {
readLock.unlock();
}
return total;
}

/**
* Get the number of connections to be created.
*
Expand Down Expand Up @@ -327,12 +363,21 @@ void cleanup(ConnectionPool pool) {
// Check if the pool hasn't been active in a while or not 50% are used
long timeSinceLastActive = Time.now() - pool.getLastActiveTime();
int total = pool.getNumConnections();
int active = pool.getNumActiveConnections();
// Active is a transient status in many cases for a connection since
// the handler thread uses the connection very quickly. Thus the number
// of connections with handlers using at the call time is constantly low.
// Recently active is more lasting status and it shows how many
// connections have been used with a recent time period. (i.e. 30 seconds)
int active = pool.getNumActiveConnectionsRecently();
float poolMinActiveRatio = pool.getMinActiveRatio();
if (timeSinceLastActive > connectionCleanupPeriodMs ||
active < poolMinActiveRatio * total) {
// Remove and close 1 connection
List<ConnectionContext> conns = pool.removeConnections(1);
// Be greedy here to close as many connections as possible in one shot
// The number should at least be 1
int targetConnectionsCount = Math.max(1,
(int)(poolMinActiveRatio * total) - active);
List<ConnectionContext> conns =
pool.removeConnections(targetConnectionsCount);
for (ConnectionContext conn : conns) {
conn.close();
}
Expand Down Expand Up @@ -414,7 +459,7 @@ public void run() {
ConnectionPool pool = this.queue.take();
try {
int total = pool.getNumConnections();
int active = pool.getNumActiveConnections();
int active = pool.getNumActiveConnectionsRecently();
float poolMinActiveRatio = pool.getMinActiveRatio();
if (pool.getNumConnections() < pool.getMaxSize() &&
active >= poolMinActiveRatio * total) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,19 +252,23 @@ public synchronized void addConnection(ConnectionContext conn) {
*/
public synchronized List<ConnectionContext> removeConnections(int num) {
List<ConnectionContext> removed = new LinkedList<>();

// Remove and close the last connection
List<ConnectionContext> tmpConnections = new ArrayList<>();
for (int i=0; i<this.connections.size(); i++) {
ConnectionContext conn = this.connections.get(i);
if (i < this.minSize || i < this.connections.size() - num) {
tmpConnections.add(conn);
} else {
removed.add(conn);
if (this.connections.size() > this.minSize) {
int targetCount = Math.min(num, this.connections.size() - this.minSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, connections is not thread safe, so is it possible targetCount is negative here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it can negative here since the only place connections become less is in this function at the swap part with the tmpConnections. The other place where this var gets assigned is in the creation part and it can only increase the value.

// Remove and close targetCount of connections
List<ConnectionContext> tmpConnections = new ArrayList<>();
for (int i = 0; i < this.connections.size(); i++) {
ConnectionContext conn = this.connections.get(i);
// Only pick idle connections to close
if (removed.size() < targetCount && conn.isUsable()) {
removed.add(conn);
} else {
tmpConnections.add(conn);
}
}
this.connections = tmpConnections;
}
this.connections = tmpConnections;

LOG.debug("Expected to remove {} connection " +
"and actually removed {} connections", num, removed.size());
return removed;
}

Expand All @@ -278,7 +282,7 @@ protected synchronized void close() {
this.connectionPoolId, timeSinceLastActive);

for (ConnectionContext connection : this.connections) {
connection.close();
connection.close(true);
}
this.connections.clear();
}
Expand Down Expand Up @@ -309,6 +313,39 @@ protected int getNumActiveConnections() {
return ret;
}

/**
* Number of usable i.e. no active thread connections.
*
* @return Number of idle connections
*/
protected int getNumIdleConnections() {
int ret = 0;

List<ConnectionContext> tmpConnections = this.connections;
for (ConnectionContext conn : tmpConnections) {
if (conn.isUsable()) {
ret++;
}
}
return ret;
}

/**
* Number of active connections recently in the pool.
*
* @return Number of active connections recently.
*/
protected int getNumActiveConnectionsRecently() {
int ret = 0;
List<ConnectionContext> tmpConnections = this.connections;
for (ConnectionContext conn : tmpConnections) {
if (conn.isActiveRecently()) {
ret++;
}
}
return ret;
}

/**
* Get the last time the connection pool was used.
*
Expand All @@ -331,12 +368,18 @@ public String toString() {
public String getJSON() {
final Map<String, String> info = new LinkedHashMap<>();
info.put("active", Integer.toString(getNumActiveConnections()));
info.put("recent_active",
Integer.toString(getNumActiveConnectionsRecently()));
info.put("idle", Integer.toString(getNumIdleConnections()));
info.put("total", Integer.toString(getNumConnections()));
if (LOG.isDebugEnabled()) {
List<ConnectionContext> tmpConnections = this.connections;
for (int i=0; i<tmpConnections.size(); i++) {
ConnectionContext connection = tmpConnections.get(i);
info.put(i + " active", Boolean.toString(connection.isActive()));
info.put(i + " recent_active",
Integer.toString(getNumActiveConnectionsRecently()));
info.put(i + " idle", Boolean.toString(connection.isUsable()));
info.put(i + " closed", Boolean.toString(connection.isClosed()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,24 @@ public int getNumActiveConnections() {
return this.connectionManager.getNumActiveConnections();
}

/**
* Total number of idle sockets between the router and NNs.
*
* @return Number of namenode clients.
*/
public int getNumIdleConnections() {
return this.connectionManager.getNumIdleConnections();
}

/**
* Total number of active sockets between the router and NNs.
*
* @return Number of recently active namenode clients.
*/
public int getNumActiveConnectionsRecently() {
return this.connectionManager.getNumActiveConnectionsRecently();
}

/**
* Total number of open connection pools to a NN. Each connection pool.
* represents one user + one NN.
Expand Down
Loading