Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
public class AbstractRouterRpcFairnessPolicyController
implements RouterRpcFairnessPolicyController {

private static final Logger LOG =
public static final Logger LOG =
LoggerFactory.getLogger(AbstractRouterRpcFairnessPolicyController.class);

/** Hash table to hold semaphore for each configured name service. */
Expand Down Expand Up @@ -64,6 +64,7 @@ public void releasePermit(String nsId) {

@Override
public void shutdown() {
LOG.debug("Shutting down router fairness policy controller");
// drain all semaphores
for (Semaphore sema: this.permits.values()) {
sema.drainPermits();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.federation.fairness;

import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.ipc.RefreshHandler;
import org.apache.hadoop.ipc.RefreshResponse;

public class RefreshFairnessPolicyControllerHandler implements RefreshHandler {

final static public String HANDLER_IDENTIFIER = "RefreshFairnessPolicyController";
private final Router router;

public RefreshFairnessPolicyControllerHandler(Router router) {
this.router = router;
}

@Override
public RefreshResponse handleRefresh(String identifier, String[] args) {
if (HANDLER_IDENTIFIER.equals(identifier)) {
return new RefreshResponse(0, router.getRpcServer().refreshFairnessPolicyController());
}
return new RefreshResponse(-1, "Failed");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.server.federation.fairness.RefreshFairnessPolicyControllerHandler.HANDLER_IDENTIFIER;

import java.io.IOException;
import java.net.InetSocketAddress;
Expand All @@ -45,6 +46,7 @@
import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.RouterPolicyProvider;
import org.apache.hadoop.hdfs.server.federation.fairness.RefreshFairnessPolicyControllerHandler;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
Expand Down Expand Up @@ -211,6 +213,8 @@ public RouterAdminServer(Configuration conf, Router router)
genericRefreshService, adminServer);
DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
refreshCallQueueService, adminServer);

registerRefreshFairnessPolicyControllerHandler();
}

/**
Expand Down Expand Up @@ -784,4 +788,9 @@ public void refreshCallQueue() throws IOException {
Configuration configuration = new Configuration();
router.getRpcServer().getServer().refreshCallQueue(configuration);
}

private void registerRefreshFairnessPolicyControllerHandler() {
RefreshRegistry.defaultRegistry()
.register(HANDLER_IDENTIFIER, new RefreshFairnessPolicyControllerHandler(router));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public class RouterRpcClient {
Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)");

/** Fairness manager to control handlers assigned per NS. */
private RouterRpcFairnessPolicyController routerRpcFairnessPolicyController;
private volatile RouterRpcFairnessPolicyController routerRpcFairnessPolicyController;
private Map<String, LongAdder> rejectedPermitsPerNs = new ConcurrentHashMap<>();
private Map<String, LongAdder> acceptedPermitsPerNs = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -451,7 +451,8 @@ private RetryDecision shouldRetry(final IOException ioe, final int retryCount,
* @throws StandbyException If all Namenodes are in Standby.
* @throws IOException If it cannot invoke the method.
*/
private Object invokeMethod(
@VisibleForTesting
public Object invokeMethod(
final UserGroupInformation ugi,
final List<? extends FederationNamenodeContext> namenodes,
final Class<?> protocol, final Method method, final Object... params)
Expand Down Expand Up @@ -828,7 +829,8 @@ public Object invokeSingleBlockPool(final String bpId, RemoteMethod method)
public Object invokeSingle(final String nsId, RemoteMethod method)
throws IOException {
UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
acquirePermit(nsId, ugi, method);
RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
acquirePermit(nsId, ugi, method, controller);
try {
List<? extends FederationNamenodeContext> nns =
getNamenodesForNameservice(nsId);
Expand All @@ -838,7 +840,7 @@ public Object invokeSingle(final String nsId, RemoteMethod method)
Object[] params = method.getParams(loc);
return invokeMethod(ugi, nns, proto, m, params);
} finally {
releasePermit(nsId, ugi, method);
releasePermit(nsId, ugi, method, controller);
}
}

Expand Down Expand Up @@ -989,14 +991,15 @@ public <R extends RemoteLocationContext, T> RemoteResult invokeSequential(
Class<T> expectedResultClass, Object expectedResultValue)
throws IOException {

RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
final Method m = remoteMethod.getMethod();
List<IOException> thrownExceptions = new ArrayList<>();
Object firstResult = null;
// Invoke in priority order
for (final RemoteLocationContext loc : locations) {
String ns = loc.getNameserviceId();
acquirePermit(ns, ugi, remoteMethod);
acquirePermit(ns, ugi, remoteMethod, controller);
List<? extends FederationNamenodeContext> namenodes =
getNamenodesForNameservice(ns);
try {
Expand Down Expand Up @@ -1031,7 +1034,7 @@ public <R extends RemoteLocationContext, T> RemoteResult invokeSequential(
"Unexpected exception proxying API " + e.getMessage(), e);
thrownExceptions.add(ioe);
} finally {
releasePermit(ns, ugi, remoteMethod);
releasePermit(ns, ugi, remoteMethod, controller);
}
}

Expand Down Expand Up @@ -1356,7 +1359,8 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
// Shortcut, just one call
T location = locations.iterator().next();
String ns = location.getNameserviceId();
acquirePermit(ns, ugi, method);
RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
acquirePermit(ns, ugi, method, controller);
final List<? extends FederationNamenodeContext> namenodes =
getNamenodesForNameservice(ns);
try {
Expand All @@ -1369,7 +1373,7 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
// Localize the exception
throw processException(ioe, location);
} finally {
releasePermit(ns, ugi, method);
releasePermit(ns, ugi, method, controller);
}
}

Expand Down Expand Up @@ -1419,7 +1423,8 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
this.router.getRouterClientMetrics().incInvokedConcurrent(m);
}

acquirePermit(CONCURRENT_NS, ugi, method);
RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
acquirePermit(CONCURRENT_NS, ugi, method, controller);
try {
List<Future<Object>> futures = null;
if (timeOutMs > 0) {
Expand Down Expand Up @@ -1477,7 +1482,7 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
throw new IOException(
"Unexpected error while invoking API " + ex.getMessage(), ex);
} finally {
releasePermit(CONCURRENT_NS, ugi, method);
releasePermit(CONCURRENT_NS, ugi, method, controller);
}
}

Expand Down Expand Up @@ -1558,13 +1563,14 @@ private String getNameserviceForBlockPoolId(final String bpId)
* @param nsId Identifier of the block pool.
* @param ugi UserGroupIdentifier associated with the user.
* @param m Remote method that needs to be invoked.
* @param controller fairness policy controller to acquire permit from
* @throws IOException If permit could not be acquired for the nsId.
*/
private void acquirePermit(
final String nsId, final UserGroupInformation ugi, final RemoteMethod m)
private void acquirePermit(final String nsId, final UserGroupInformation ugi,
final RemoteMethod m, RouterRpcFairnessPolicyController controller)
throws IOException {
if (routerRpcFairnessPolicyController != null) {
if (!routerRpcFairnessPolicyController.acquirePermit(nsId)) {
if (controller != null) {
if (!controller.acquirePermit(nsId)) {
// Throw StandByException,
// Clients could fail over and try another router.
if (rpcMonitor != null) {
Expand All @@ -1585,21 +1591,20 @@ private void acquirePermit(
/**
* Release permit for specific nsId after processing against downstream
* nsId is completed.
*
* @param nsId Identifier of the block pool.
* @param nsId Identifier of the block pool.
* @param ugi UserGroupIdentifier associated with the user.
* @param m Remote method that needs to be invoked.
* @param controller fairness policy controller to release permit from
*/
private void releasePermit(
final String nsId, final UserGroupInformation ugi, final RemoteMethod m) {
if (routerRpcFairnessPolicyController != null) {
routerRpcFairnessPolicyController.releasePermit(nsId);
private void releasePermit(final String nsId, final UserGroupInformation ugi,
final RemoteMethod m, RouterRpcFairnessPolicyController controller) {
if (controller != null) {
controller.releasePermit(nsId);
LOG.trace("Permit released for ugi: {} for method: {}", ugi,
m.getMethodName());
}
}

@VisibleForTesting
public RouterRpcFairnessPolicyController
getRouterRpcFairnessPolicyController() {
return routerRpcFairnessPolicyController;
Expand All @@ -1622,4 +1627,35 @@ public Long getAcceptedPermitForNs(String ns) {
return acceptedPermitsPerNs.containsKey(ns) ?
acceptedPermitsPerNs.get(ns).longValue() : 0L;
}

/**
* Refreshes/changes the fairness policy controller implementation if possible
* and returns the controller class name
* @param conf Configuration
* @return New controller class name if successfully refreshed, else old controller class name
*/
public synchronized String refreshFairnessPolicyController(Configuration conf) {
RouterRpcFairnessPolicyController newController;
try {
newController = FederationUtil.newFairnessPolicyController(conf);
} catch (RuntimeException e) {
LOG.error("Failed to create router fairness policy controller", e);
return getCurrentFairnessPolicyControllerClassName();
}

if (newController != null) {
if (routerRpcFairnessPolicyController != null) {
routerRpcFairnessPolicyController.shutdown();
}
routerRpcFairnessPolicyController = newController;
}
return getCurrentFairnessPolicyControllerClassName();
}

private String getCurrentFairnessPolicyControllerClassName() {
if (routerRpcFairnessPolicyController != null) {
return routerRpcFairnessPolicyController.getClass().getCanonicalName();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ public FileSubclusterResolver getSubclusterResolver() {
}

/**
* Get the active namenode resolver
* Get the active namenode resolver.
*
* @return Active namenode resolver.
*/
Expand Down Expand Up @@ -1990,6 +1990,10 @@ public int getSchedulerJobCount() {
return fedRenameScheduler.getAllJobs().size();
}

public String refreshFairnessPolicyController() {
return rpcClient.refreshFairnessPolicyController(new Configuration());
}

/**
* Deals with loading datanode report into the cache and refresh.
*/
Expand Down
Loading