Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,14 @@ public void routerFailureReadOnly() {
}
}

@Override
public void recordAsyncQueueSize(String nsId, int queueSize) {
if (nameserviceRPCMetricsMap != null &&
nameserviceRPCMetricsMap.containsKey(nsId)) {
nameserviceRPCMetricsMap.get(nsId).setAsyncHandlerQueueSize(queueSize);
}
}

@Override
public void routerFailureLocked() {
if (metrics != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableRate;

import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -55,6 +56,8 @@ public class NameserviceRPCMetrics implements NameserviceRPCMBean {
private MutableCounterLong proxyOpPermitRejected;
@Metric("Number of operations accepted to hit a namenode")
private MutableCounterLong proxyOpPermitAccepted;
@Metric("Async Queue Size")
private MutableGaugeInt asyncHandlerQueueSize;

public NameserviceRPCMetrics(Configuration conf, String nsId) {
this.nsId = NAMESERVICE_RPC_METRICS_PREFIX + nsId;
Expand Down Expand Up @@ -116,6 +119,10 @@ public long getProxyOpPermitAccepted() {
return proxyOpPermitAccepted.value();
}

public void setAsyncHandlerQueueSize(int size) {
asyncHandlerQueueSize.set(size);
}

/**
* Add the time to proxy an operation from the moment the Router sends it to
* the Namenode until it replied.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
public static final String DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY =
FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "handler.count";
public static final int DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT = 10;
public static final String DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE =
FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "queue.size";
public static final int DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE_DEFAULT = 1000;
public static final String DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY =
FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "responder.count";
public static final int DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,12 @@ void init(
* If a path is in a read only mount point.
*/
void routerFailureReadOnly();

/**
* Records the size of the async handler queue for the given namespace.
*
* @param nsId the namespace identifier
* @param queueSize the current size of the async queue
*/
void recordAsyncQueueSize(String nsId, int queueSize);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT;
Expand Down Expand Up @@ -58,7 +60,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
Expand All @@ -70,7 +71,9 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -292,11 +295,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
private RouterRenameOption routerRenameOption;
/** Schedule the router federation rename jobs. */
private BalanceProcedureScheduler fedRenameScheduler;
private boolean enableAsync;
private Map<String, Integer> nsAsyncHandlerCount = new ConcurrentHashMap<>();
private Map<String, ExecutorService> asyncRouterHandlerExecutors = new ConcurrentHashMap<>();
private final boolean enableAsync;
private final Map<String, ThreadPoolExecutor> asyncRouterHandlerExecutors = new ConcurrentHashMap<>();
private ThreadPoolExecutor routerDefaultAsyncHandlerExecutor;
private ExecutorService routerAsyncResponderExecutor;
private ExecutorService routerDefaultAsyncHandlerExecutor;

/**
* Construct a router RPC server.
Expand Down Expand Up @@ -504,31 +506,38 @@ public RouterRpcServer(Configuration conf, Router router,
*/
public void initAsyncThreadPools(Configuration configuration) {
LOG.info("Begin initialize asynchronous handler and responder thread pool.");
initNsAsyncHandlerCount();
Map<String, Integer> nsAsyncHandlerCount = parseNsAsyncHandlerCount(configuration);
Set<String> allConfiguredNS = FederationUtil.getAllConfiguredNS(configuration);
Set<String> unassignedNS = new HashSet<>();
allConfiguredNS.add(CONCURRENT_NS);

int asyncQueueSize = configuration.getInt(DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE,
DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE_DEFAULT);
if (asyncQueueSize <= 1) {
throw new IllegalArgumentException("Async queue size must be greater than 1");
}
int asyncHandlerCountDefault = configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);
for (String nsId : allConfiguredNS) {
int dedicatedHandlers = nsAsyncHandlerCount.getOrDefault(nsId, 0);
LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
if (dedicatedHandlers <= 0) {
dedicatedHandlers = asyncHandlerCountDefault;
LOG.info("Use default async handler count {} for ns {}.", asyncHandlerCountDefault, nsId);
} else {
LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
}

if (dedicatedHandlers > 0) {
initAsyncHandlerThreadPools4Ns(nsId, dedicatedHandlers);
int finalDedicatedHandlers = dedicatedHandlers;
asyncRouterHandlerExecutors.computeIfAbsent(nsId,
id -> initAsyncHandlerThreadPools4Ns(id, asyncQueueSize, finalDedicatedHandlers));
LOG.info("Assigned {} async handlers to nsId {} ", dedicatedHandlers, nsId);
} else {
unassignedNS.add(nsId);
}
}

int asyncHandlerCountDefault = configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);

if (!unassignedNS.isEmpty()) {
LOG.warn("Async handler unassigned ns: {}", unassignedNS);
LOG.info("Use default async handler count {} for unassigned ns.", asyncHandlerCountDefault);
for (String nsId : unassignedNS) {
initAsyncHandlerThreadPools4Ns(nsId, asyncHandlerCountDefault);
}
if (routerDefaultAsyncHandlerExecutor == null) {
LOG.info("init router async default executor handler count: {}", asyncHandlerCountDefault);
routerDefaultAsyncHandlerExecutor = initAsyncHandlerThreadPools4Ns(
"default", asyncQueueSize, asyncHandlerCountDefault);
}

int asyncResponderCount = configuration.getInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY,
Expand All @@ -539,17 +548,35 @@ public void initAsyncThreadPools(Configuration configuration) {
asyncResponderCount, new AsyncThreadFactory("Router Async Responder #"));
}
AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(routerAsyncResponderExecutor);
}

if (routerDefaultAsyncHandlerExecutor == null) {
LOG.info("init router async default executor handler count: {}", asyncHandlerCountDefault);
routerDefaultAsyncHandlerExecutor = Executors.newFixedThreadPool(
asyncHandlerCountDefault, new AsyncThreadFactory("Router Async Default Handler #"));
private ThreadPoolExecutor initAsyncHandlerThreadPools4Ns(String ns, int asyncQueueSize, int handlerCount) {
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(asyncQueueSize);
return new ThreadPoolExecutor(handlerCount, handlerCount,
0L, TimeUnit.MILLISECONDS, queue,
new AsyncThreadFactory("Router Async Handler for " + ns + " #"));
}

/**
* Returns the asynchronous executor for the specified namespace.
* If no executor is configured for the given namespace ID, returns the default executor.
*
* @param nsId the namespace identifier
* @return the corresponding ExecutorService
*/
public ThreadPoolExecutor getAsyncExecutorForNamespace(String nsId) {
ThreadPoolExecutor executorService = asyncRouterHandlerExecutors.getOrDefault(
nsId, routerDefaultAsyncHandlerExecutor);
if (rpcMonitor != null) {
rpcMonitor.recordAsyncQueueSize(nsId, executorService.getQueue().size());
}
return executorService;
}

private void initNsAsyncHandlerCount() {
private Map<String, Integer> parseNsAsyncHandlerCount(Configuration conf) {
String configNsHandler = conf.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY,
DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT);
Map<String, Integer> nsAsyncHandlerCount = new ConcurrentHashMap<>();
if (StringUtils.isEmpty(configNsHandler)) {
LOG.error(
"The value of config key: {} is empty. Will use default conf.",
Expand All @@ -566,11 +593,7 @@ private void initNsAsyncHandlerCount() {
}
nsAsyncHandlerCount.put(nsHandlerItems[0], Integer.parseInt(nsHandlerItems[1]));
}
}

private void initAsyncHandlerThreadPools4Ns(String nsId, int dedicatedHandlers) {
asyncRouterHandlerExecutors.computeIfAbsent(nsId, id -> Executors.newFixedThreadPool(
dedicatedHandlers, new AsyncThreadFactory("Router Async Handler for " + id + " #")));
return nsAsyncHandlerCount;
}

/**
Expand Down Expand Up @@ -2489,14 +2512,6 @@ public boolean isAsync() {
return this.enableAsync;
}

public Map<String, ExecutorService> getAsyncRouterHandlerExecutors() {
return asyncRouterHandlerExecutors;
}

public ExecutorService getRouterAsyncHandlerDefaultExecutor() {
return routerDefaultAsyncHandlerExecutor;
}

private static class AsyncThreadFactory implements ThreadFactory {
private final String namePrefix;
private final AtomicInteger threadNumber = new AtomicInteger(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;

import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
Expand Down Expand Up @@ -172,26 +174,33 @@ public Object invokeMethod(
" with params " + Arrays.deepToString(params) + " from "
+ router.getRouterId());
}
String nsid = namenodes.get(0).getNameserviceId();
String nsId = namenodes.get(0).getNameserviceId();
ThreadPoolExecutor executor = router.getRpcServer().getAsyncExecutorForNamespace(nsId);
int queueSize = executor.getQueue().size();
// transfer threadLocalContext to worker threads of executor.
ThreadLocalContext threadLocalContext = new ThreadLocalContext();
asyncComplete(null);
asyncApplyUseExecutor((AsyncApplyFunction<Object, Object>) o -> {
asyncTry(() -> asyncApplyUseExecutor((AsyncApplyFunction<Object, Object>) o -> {
if (LOG.isDebugEnabled()) {
LOG.debug("Async invoke method : {}, {}, {}, {}", method.getName(), useObserver,
namenodes.toString(), params);
namenodes, params);
}
threadLocalContext.transfer();
RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
acquirePermit(nsid, ugi, method.getName(), controller);
acquirePermit(nsId, ugi, method.getName(), controller);
invokeMethodAsync(ugi, (List<FederationNamenodeContext>) namenodes,
useObserver, protocol, method, params);
asyncFinally(object -> {
releasePermit(nsid, ugi, method, controller);
releasePermit(nsId, ugi, method, controller);
return object;
});
}, router.getRpcServer().getAsyncRouterHandlerExecutors().getOrDefault(nsid,
router.getRpcServer().getRouterAsyncHandlerDefaultExecutor()));
}, executor));

// Convert RejectedExecutionException to StandbyException since
asyncCatch((ret, e) -> {
LOG.warn("Async handler queue is full for namespace '{}'. Current queue size: {}", nsId, queueSize);
throw new StandbyException("Namespace '" + nsId + "' is overloaded (queue size: " + queueSize + ")");
}, RejectedExecutionException.class);
return null;
}

Expand Down