From bb35ec4b477abac7cefe03dcef2cde1327d5f43e Mon Sep 17 00:00:00 2001 From: Nguyen Cong Thanh Date: Tue, 12 Apr 2022 11:04:47 +0800 Subject: [PATCH 1/6] HDFS-16539. Add RefreshFairnessPolicyControllerHandler --- ...efreshFairnessPolicyControllerHandler.java | 41 ++++++ .../federation/router/RouterAdminServer.java | 9 ++ .../federation/router/RouterRpcClient.java | 53 ++++--- .../federation/router/RouterRpcServer.java | 5 + ...RouterRefreshFairnessPolicyController.java | 131 ++++++++++++++++++ 5 files changed, 220 insertions(+), 19 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RefreshFairnessPolicyControllerHandler.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RefreshFairnessPolicyControllerHandler.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RefreshFairnessPolicyControllerHandler.java new file mode 100644 index 0000000000000..f7bc0e8f5a6e0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RefreshFairnessPolicyControllerHandler.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.fairness; + +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.ipc.RefreshHandler; +import org.apache.hadoop.ipc.RefreshResponse; + +public class RefreshFairnessPolicyControllerHandler implements RefreshHandler { + + final static public String HANDLER_IDENTIFIER = "RefreshFairnessPolicyController"; + private final Router router; + + public RefreshFairnessPolicyControllerHandler(Router router) { + this.router = router; + } + + @Override + public RefreshResponse handleRefresh(String identifier, String[] args) { + if (HANDLER_IDENTIFIER.equals(identifier)) { + return new RefreshResponse(0, router.getRpcServer().refreshFairnessPolicyController()); + } + return new RefreshResponse(-1, "Failed"); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java index deb933ad5adb8..127470a1264ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY; +import static org.apache.hadoop.hdfs.server.federation.fairness.RefreshFairnessPolicyControllerHandler.HANDLER_IDENTIFIER; import java.io.IOException; import java.net.InetSocketAddress; @@ -45,6 +46,7 @@ import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolPB; import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.RouterPolicyProvider; +import org.apache.hadoop.hdfs.server.federation.fairness.RefreshFairnessPolicyControllerHandler; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; @@ -211,6 +213,8 @@ public RouterAdminServer(Configuration conf, Router router) genericRefreshService, adminServer); DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class, refreshCallQueueService, adminServer); + + registerRefreshFairnessPolicyControllerHandler(); } /** @@ -784,4 +788,9 @@ public void refreshCallQueue() throws IOException { Configuration configuration = new Configuration(); router.getRpcServer().getServer().refreshCallQueue(configuration); } + + private void registerRefreshFairnessPolicyControllerHandler() { + RefreshRegistry.defaultRegistry() + .register(HANDLER_IDENTIFIER, new RefreshFairnessPolicyControllerHandler(router)); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 90d6c347ef73e..35039ce412b53 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -451,7 +451,8 @@ private RetryDecision shouldRetry(final IOException ioe, final int retryCount, * @throws StandbyException If all Namenodes are in Standby. * @throws IOException If it cannot invoke the method. */ - private Object invokeMethod( + @VisibleForTesting + public Object invokeMethod( final UserGroupInformation ugi, final List namenodes, final Class protocol, final Method method, final Object... params) @@ -828,7 +829,8 @@ public Object invokeSingleBlockPool(final String bpId, RemoteMethod method) public Object invokeSingle(final String nsId, RemoteMethod method) throws IOException { UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); - acquirePermit(nsId, ugi, method); + RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); + acquirePermit(nsId, ugi, method, controller); try { List nns = getNamenodesForNameservice(nsId); @@ -838,7 +840,7 @@ public Object invokeSingle(final String nsId, RemoteMethod method) Object[] params = method.getParams(loc); return invokeMethod(ugi, nns, proto, m, params); } finally { - releasePermit(nsId, ugi, method); + releasePermit(nsId, ugi, method, controller); } } @@ -989,6 +991,7 @@ public RemoteResult invokeSequential( Class expectedResultClass, Object expectedResultValue) throws IOException { + RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); final Method m = remoteMethod.getMethod(); List thrownExceptions = new ArrayList<>(); @@ -996,7 +999,7 @@ public RemoteResult invokeSequential( // Invoke in priority order for (final RemoteLocationContext loc : locations) { String ns = loc.getNameserviceId(); - acquirePermit(ns, ugi, remoteMethod); + acquirePermit(ns, ugi, remoteMethod, controller); List namenodes = getNamenodesForNameservice(ns); try { @@ -1031,7 +1034,7 @@ public RemoteResult invokeSequential( "Unexpected exception proxying API " + e.getMessage(), e); thrownExceptions.add(ioe); } finally { - releasePermit(ns, ugi, remoteMethod); + releasePermit(ns, ugi, remoteMethod, controller); } } @@ -1356,7 +1359,8 @@ public Map invokeConcurrent( // Shortcut, just one call T location = locations.iterator().next(); String ns = location.getNameserviceId(); - acquirePermit(ns, ugi, method); + RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); + acquirePermit(ns, ugi, method, controller); final List namenodes = getNamenodesForNameservice(ns); try { @@ -1369,7 +1373,7 @@ public Map invokeConcurrent( // Localize the exception throw processException(ioe, location); } finally { - releasePermit(ns, ugi, method); + releasePermit(ns, ugi, method, controller); } } @@ -1419,7 +1423,8 @@ public Map invokeConcurrent( this.router.getRouterClientMetrics().incInvokedConcurrent(m); } - acquirePermit(CONCURRENT_NS, ugi, method); + RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); + acquirePermit(CONCURRENT_NS, ugi, method, controller); try { List> futures = null; if (timeOutMs > 0) { @@ -1477,7 +1482,7 @@ public Map invokeConcurrent( throw new IOException( "Unexpected error while invoking API " + ex.getMessage(), ex); } finally { - releasePermit(CONCURRENT_NS, ugi, method); + releasePermit(CONCURRENT_NS, ugi, method, controller); } } @@ -1558,13 +1563,14 @@ private String getNameserviceForBlockPoolId(final String bpId) * @param nsId Identifier of the block pool. * @param ugi UserGroupIdentifier associated with the user. * @param m Remote method that needs to be invoked. + * @param controller fairness policy controller to acquire permit from * @throws IOException If permit could not be acquired for the nsId. */ - private void acquirePermit( - final String nsId, final UserGroupInformation ugi, final RemoteMethod m) + private void acquirePermit(final String nsId, final UserGroupInformation ugi, + final RemoteMethod m, RouterRpcFairnessPolicyController controller) throws IOException { - if (routerRpcFairnessPolicyController != null) { - if (!routerRpcFairnessPolicyController.acquirePermit(nsId)) { + if (controller != null) { + if (!controller.acquirePermit(nsId)) { // Throw StandByException, // Clients could fail over and try another router. if (rpcMonitor != null) { @@ -1585,15 +1591,15 @@ private void acquirePermit( /** * Release permit for specific nsId after processing against downstream * nsId is completed. - * - * @param nsId Identifier of the block pool. + * @param nsId Identifier of the block pool. * @param ugi UserGroupIdentifier associated with the user. * @param m Remote method that needs to be invoked. + * @param controller fairness policy controller to release permit from */ - private void releasePermit( - final String nsId, final UserGroupInformation ugi, final RemoteMethod m) { - if (routerRpcFairnessPolicyController != null) { - routerRpcFairnessPolicyController.releasePermit(nsId); + private void releasePermit(final String nsId, final UserGroupInformation ugi, + final RemoteMethod m, RouterRpcFairnessPolicyController controller) { + if (controller != null) { + controller.releasePermit(nsId); LOG.trace("Permit released for ugi: {} for method: {}", ugi, m.getMethodName()); } @@ -1622,4 +1628,13 @@ public Long getAcceptedPermitForNs(String ns) { return acceptedPermitsPerNs.containsKey(ns) ? acceptedPermitsPerNs.get(ns).longValue() : 0L; } + + public synchronized String refreshFairnessPolicyController(Configuration conf) { + if (routerRpcFairnessPolicyController != null) { + routerRpcFairnessPolicyController.shutdown(); + } + routerRpcFairnessPolicyController = + FederationUtil.newFairnessPolicyController(conf); + return routerRpcFairnessPolicyController.getClass().getCanonicalName(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 2b6c4a1f2f4c7..154c6e6d908c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -1990,6 +1990,11 @@ public int getSchedulerJobCount() { return fedRenameScheduler.getAllJobs().size(); } + public String refreshFairnessPolicyController() { + Configuration conf = new Configuration(); + return rpcClient.refreshFairnessPolicyController(conf); + } + /** * Deals with loading datanode report into the cache and refresh. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java new file mode 100644 index 0000000000000..f0b56ae11a2f2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.fairness; + +import java.io.IOException; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient; + +import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestRouterRefreshFairnessPolicyController { + + private static final Logger LOG = + LoggerFactory.getLogger(TestRouterRefreshFairnessPolicyController.class); + + private StateStoreDFSCluster cluster; + + @After + public void cleanup() { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + @Before + public void setupCluster() throws Exception { + cluster = new StateStoreDFSCluster(false, 1); + Configuration conf = new RouterConfigBuilder().stateStore().rpc().build(); + + // Handlers concurrent:ns0 = 3:3 + conf.setClass(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS, + StaticRouterRpcFairnessPolicyController.class, RouterRpcFairnessPolicyController.class); + conf.setInt(RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY, 6); + + // Datanodes not needed for this test. + cluster.setNumDatanodesPerNameservice(0); + + cluster.addRouterOverrides(conf); + cluster.startCluster(); + cluster.startRouters(); + cluster.waitClusterUp(); + } + + @Test + public void testRefreshStaticChangeHandlers() throws Exception { + MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter(); + RemoteMethod dummyMethod = Mockito.mock(RemoteMethod.class); + RouterRpcClient client = Mockito.spy(routerContext.getRouterRpcClient()); + final long sleepTime = 3000; + Mockito.doAnswer(invocationOnMock -> { + Thread.sleep(sleepTime); + return null; + }).when(client) + .invokeMethod(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + + final int N = 3; + Thread[] threadAcquirePermits = new Thread[N]; + for (int i = 0; i < N; i++) { + Thread threadAcquirePermit = new Thread(() -> { + try { + client.invokeSingle("ns0", dummyMethod); + } catch (IOException e) { + e.printStackTrace(); + } + }); + threadAcquirePermits[i] = threadAcquirePermit; + threadAcquirePermits[i].start(); + } + + Thread.sleep(1000); + + Configuration conf = routerContext.getConf(); + final int newNs0Permits = 1; // Set to smaller than current handler count (3) + conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns0", newNs0Permits); + Thread threadRefreshController = new Thread(() -> { + client.refreshFairnessPolicyController(routerContext.getConf()); + }); + threadRefreshController.start(); + threadRefreshController.join(); + + StaticRouterRpcFairnessPolicyController controller = + (StaticRouterRpcFairnessPolicyController) client.getRouterRpcFairnessPolicyController(); + for (int i = 0; i < N; i++) { + threadAcquirePermits[i].join(); + } + + // Controller should now have 5:1 handlers for concurrent:ns0 + for (int i = 0; i < 5; i++) { + assertTrue(controller.acquirePermit(CONCURRENT_NS)); + } + // Invocations before refresh should not interfere with invocations after + assertTrue(controller.acquirePermit("ns0")); + + // Acquiring a permit on any ns now will fail due to overload + assertFalse(controller.acquirePermit(CONCURRENT_NS)); + assertFalse(controller.acquirePermit("ns0")); + } +} From c6694d92bf1e06d218f1a8ef78020bc3d408706f Mon Sep 17 00:00:00 2001 From: Nguyen Cong Thanh Date: Thu, 14 Apr 2022 14:36:29 +0800 Subject: [PATCH 2/6] Minor change --- .../hdfs/server/federation/router/RouterRpcClient.java | 9 ++------- .../hdfs/server/federation/router/RouterRpcServer.java | 3 +-- .../TestRouterRefreshFairnessPolicyController.java | 8 ++++---- 3 files changed, 7 insertions(+), 13 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 35039ce412b53..feed58693c65e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -1605,7 +1605,6 @@ private void releasePermit(final String nsId, final UserGroupInformation ugi, } } - @VisibleForTesting public RouterRpcFairnessPolicyController getRouterRpcFairnessPolicyController() { return routerRpcFairnessPolicyController; @@ -1629,12 +1628,8 @@ public Long getAcceptedPermitForNs(String ns) { acceptedPermitsPerNs.get(ns).longValue() : 0L; } - public synchronized String refreshFairnessPolicyController(Configuration conf) { - if (routerRpcFairnessPolicyController != null) { - routerRpcFairnessPolicyController.shutdown(); - } - routerRpcFairnessPolicyController = - FederationUtil.newFairnessPolicyController(conf); + public String refreshFairnessPolicyController(Configuration conf) { + routerRpcFairnessPolicyController = FederationUtil.newFairnessPolicyController(conf); return routerRpcFairnessPolicyController.getClass().getCanonicalName(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 154c6e6d908c5..0287c9b3d3339 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -1991,8 +1991,7 @@ public int getSchedulerJobCount() { } public String refreshFairnessPolicyController() { - Configuration conf = new Configuration(); - return rpcClient.refreshFairnessPolicyController(conf); + return rpcClient.refreshFairnessPolicyController(new Configuration()); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java index f0b56ae11a2f2..f5d06be0a6a38 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java @@ -86,9 +86,9 @@ public void testRefreshStaticChangeHandlers() throws Exception { }).when(client) .invokeMethod(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); - final int N = 3; - Thread[] threadAcquirePermits = new Thread[N]; - for (int i = 0; i < N; i++) { + final int nThreads = 3; + Thread[] threadAcquirePermits = new Thread[nThreads]; + for (int i = 0; i < nThreads; i++) { Thread threadAcquirePermit = new Thread(() -> { try { client.invokeSingle("ns0", dummyMethod); @@ -113,7 +113,7 @@ public void testRefreshStaticChangeHandlers() throws Exception { StaticRouterRpcFairnessPolicyController controller = (StaticRouterRpcFairnessPolicyController) client.getRouterRpcFairnessPolicyController(); - for (int i = 0; i < N; i++) { + for (int i = 0; i < nThreads; i++) { threadAcquirePermits[i].join(); } From 1195a372bbcec0dbedc60b76f36adf2d10ecce8a Mon Sep 17 00:00:00 2001 From: Nguyen Cong Thanh Date: Mon, 18 Apr 2022 12:10:00 +0800 Subject: [PATCH 3/6] Minor change --- .../hadoop/hdfs/server/federation/router/RouterRpcClient.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index feed58693c65e..c818e3f8f74c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -1629,6 +1629,9 @@ public Long getAcceptedPermitForNs(String ns) { } public String refreshFairnessPolicyController(Configuration conf) { + if (routerRpcFairnessPolicyController != null) { + routerRpcFairnessPolicyController.shutdown(); + } routerRpcFairnessPolicyController = FederationUtil.newFairnessPolicyController(conf); return routerRpcFairnessPolicyController.getClass().getCanonicalName(); } From ebc2594fa0fde2361892ca66b64a7d25ad180c8d Mon Sep 17 00:00:00 2001 From: Nguyen Cong Thanh Date: Thu, 21 Apr 2022 15:48:27 +0800 Subject: [PATCH 4/6] Change logic, add tests --- ...ractRouterRpcFairnessPolicyController.java | 3 +- .../federation/router/RouterRpcClient.java | 33 +++- ...RouterRefreshFairnessPolicyController.java | 167 ++++++++++++++---- 3 files changed, 165 insertions(+), 38 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java index 548f1a82f6d83..fe498c66b7ee8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java @@ -36,7 +36,7 @@ public class AbstractRouterRpcFairnessPolicyController implements RouterRpcFairnessPolicyController { - private static final Logger LOG = + public static final Logger LOG = LoggerFactory.getLogger(AbstractRouterRpcFairnessPolicyController.class); /** Hash table to hold semaphore for each configured name service. */ @@ -64,6 +64,7 @@ public void releasePermit(String nsId) { @Override public void shutdown() { + LOG.debug("Shutting down router fairness policy controller"); // drain all semaphores for (Semaphore sema: this.permits.values()) { sema.drainPermits(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index c818e3f8f74c2..34a2c47c3ef29 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -132,7 +132,7 @@ public class RouterRpcClient { Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)"); /** Fairness manager to control handlers assigned per NS. */ - private RouterRpcFairnessPolicyController routerRpcFairnessPolicyController; + private volatile RouterRpcFairnessPolicyController routerRpcFairnessPolicyController; private Map rejectedPermitsPerNs = new ConcurrentHashMap<>(); private Map acceptedPermitsPerNs = new ConcurrentHashMap<>(); @@ -1628,11 +1628,34 @@ public Long getAcceptedPermitForNs(String ns) { acceptedPermitsPerNs.get(ns).longValue() : 0L; } - public String refreshFairnessPolicyController(Configuration conf) { + /** + * Refreshes/changes the fairness policy controller implementation if possible + * and returns the controller class name + * @param conf Configuration + * @return New controller class name if successfully refreshed, else old controller class name + */ + public synchronized String refreshFairnessPolicyController(Configuration conf) { + RouterRpcFairnessPolicyController newController; + try { + newController = FederationUtil.newFairnessPolicyController(conf); + } catch (RuntimeException e) { + LOG.error("Failed to create router fairness policy controller", e); + return getCurrentFairnessPolicyControllerClassName(); + } + + if (newController != null) { + if (routerRpcFairnessPolicyController != null) { + routerRpcFairnessPolicyController.shutdown(); + } + routerRpcFairnessPolicyController = newController; + } + return getCurrentFairnessPolicyControllerClassName(); + } + + private String getCurrentFairnessPolicyControllerClassName() { if (routerRpcFairnessPolicyController != null) { - routerRpcFairnessPolicyController.shutdown(); + return routerRpcFairnessPolicyController.getClass().getCanonicalName(); } - routerRpcFairnessPolicyController = FederationUtil.newFairnessPolicyController(conf); - return routerRpcFairnessPolicyController.getClass().getCanonicalName(); + return null; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java index f5d06be0a6a38..75c2c12449065 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java @@ -19,14 +19,19 @@ package org.apache.hadoop.hdfs.server.federation.fairness; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import org.junit.After; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; @@ -34,19 +39,25 @@ import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod; import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient; +import org.apache.hadoop.test.GenericTestUtils; -import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; public class TestRouterRefreshFairnessPolicyController { private static final Logger LOG = LoggerFactory.getLogger(TestRouterRefreshFairnessPolicyController.class); + private final GenericTestUtils.LogCapturer controllerLog = + GenericTestUtils.LogCapturer.captureLogs(AbstractRouterRpcFairnessPolicyController.LOG); private StateStoreDFSCluster cluster; + @BeforeClass + public static void setLogLevel() { + GenericTestUtils.setLogLevel(AbstractRouterRpcFairnessPolicyController.LOG, Level.DEBUG); + } + @After public void cleanup() { if (cluster != null) { @@ -57,13 +68,15 @@ public void cleanup() { @Before public void setupCluster() throws Exception { - cluster = new StateStoreDFSCluster(false, 1); + cluster = new StateStoreDFSCluster(false, 2); Configuration conf = new RouterConfigBuilder().stateStore().rpc().build(); // Handlers concurrent:ns0 = 3:3 conf.setClass(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS, StaticRouterRpcFairnessPolicyController.class, RouterRpcFairnessPolicyController.class); - conf.setInt(RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY, 6); + conf.setInt(RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY, 9); + // Allow metrics + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, true); // Datanodes not needed for this test. cluster.setNumDatanodesPerNameservice(0); @@ -74,10 +87,75 @@ public void setupCluster() throws Exception { cluster.waitClusterUp(); } + @Test + public void testRefreshNonexistentHandlerClass() { + MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter(); + routerContext.getConf().set(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS, + "org.apache.hadoop.hdfs.server.federation.fairness.ThisControllerDoesNotExist"); + assertEquals(StaticRouterRpcFairnessPolicyController.class.getCanonicalName(), + routerContext.getRouterRpcClient() + .refreshFairnessPolicyController(routerContext.getConf())); + } + + @Test + public void testRefreshClassDoesNotImplementControllerInterface() { + MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter(); + routerContext.getConf().set(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS, + "org.apache.hadoop.hdfs.server.federation.fairness.TestRouterRefreshFairnessPolicyController"); + assertEquals(StaticRouterRpcFairnessPolicyController.class.getCanonicalName(), + routerContext.getRouterRpcClient() + .refreshFairnessPolicyController(routerContext.getConf())); + } + + @Test + public void testRefreshSuccessful() { + MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter(); + + routerContext.getConf().set(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS, + StaticRouterRpcFairnessPolicyController.class.getCanonicalName()); + assertEquals(StaticRouterRpcFairnessPolicyController.class.getCanonicalName(), + routerContext.getRouterRpcClient() + .refreshFairnessPolicyController(routerContext.getConf())); + + routerContext.getConf().set(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS, + NoRouterRpcFairnessPolicyController.class.getCanonicalName()); + assertEquals(NoRouterRpcFairnessPolicyController.class.getCanonicalName(), + routerContext.getRouterRpcClient() + .refreshFairnessPolicyController(routerContext.getConf())); + } + + @Test + public void testConcurrentRefreshRequests() throws InterruptedException { + MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter(); + RouterRpcClient client = Mockito.spy(routerContext.getRouterRpcClient()); + controllerLog.clearOutput(); + + // Spawn 100 concurrent refresh requests + Thread[] threads = new Thread[100]; + for (int i = 0; i < 100; i++) { + threads[i] = new Thread(() -> { + client.refreshFairnessPolicyController(routerContext.getConf()); + }); + } + + for (Thread thread : threads) { + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + // There should be 100 controller shutdowns. All controllers created should be shut down. + assertEquals(100, StringUtils.countMatches(controllerLog.getOutput(), + "Shutting down router fairness policy controller")); + controllerLog.clearOutput(); + } + @Test public void testRefreshStaticChangeHandlers() throws Exception { + // Setup and mock MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter(); - RemoteMethod dummyMethod = Mockito.mock(RemoteMethod.class); RouterRpcClient client = Mockito.spy(routerContext.getRouterRpcClient()); final long sleepTime = 3000; Mockito.doAnswer(invocationOnMock -> { @@ -86,46 +164,71 @@ public void testRefreshStaticChangeHandlers() throws Exception { }).when(client) .invokeMethod(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); - final int nThreads = 3; - Thread[] threadAcquirePermits = new Thread[nThreads]; - for (int i = 0; i < nThreads; i++) { - Thread threadAcquirePermit = new Thread(() -> { - try { - client.invokeSingle("ns0", dummyMethod); - } catch (IOException e) { - e.printStackTrace(); - } - }); - threadAcquirePermits[i] = threadAcquirePermit; - threadAcquirePermits[i].start(); - } + // No calls yet + assertEquals("{}", + routerContext.getRouterRpcServer().getRPCMetrics().getProxyOpPermitAcceptedPerNs()); + List preRefreshInvocations = makeDummyInvocations(client, 4, "ns0"); - Thread.sleep(1000); + Thread.sleep(2000); + // 3 permits acquired, calls will take 3s to finish and release permits + // 1 invocation rejected + assertEquals("{\"ns0\":3}", + routerContext.getRouterRpcServer().getRPCMetrics().getProxyOpPermitAcceptedPerNs()); + assertEquals("{\"ns0\":1}", + routerContext.getRouterRpcServer().getRPCMetrics().getProxyOpPermitRejectedPerNs()); Configuration conf = routerContext.getConf(); - final int newNs0Permits = 1; // Set to smaller than current handler count (3) + final int newNs0Permits = 2; + final int newNs1Permits = 4; conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns0", newNs0Permits); + conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", newNs1Permits); Thread threadRefreshController = new Thread(() -> { client.refreshFairnessPolicyController(routerContext.getConf()); }); threadRefreshController.start(); threadRefreshController.join(); + // Wait for all dummy invocation threads to finish + for (Thread thread : preRefreshInvocations) { + thread.join(); + } + + // Controller should now have 2:4 handlers for ns0:ns1 + // Make 4 calls to ns0 and 6 calls to ns1 so that each will fail twice StaticRouterRpcFairnessPolicyController controller = (StaticRouterRpcFairnessPolicyController) client.getRouterRpcFairnessPolicyController(); - for (int i = 0; i < nThreads; i++) { - threadAcquirePermits[i].join(); - } + System.out.println(controller.getAvailableHandlerOnPerNs()); + List ns0Invocations = makeDummyInvocations(client, newNs0Permits + 2, "ns0"); + List ns1Invocations = makeDummyInvocations(client, newNs1Permits + 2, "ns1"); - // Controller should now have 5:1 handlers for concurrent:ns0 - for (int i = 0; i < 5; i++) { - assertTrue(controller.acquirePermit(CONCURRENT_NS)); + // Wait for these threads to finish + for (Thread thread : ns0Invocations) { + thread.join(); } - // Invocations before refresh should not interfere with invocations after - assertTrue(controller.acquirePermit("ns0")); + for (Thread thread : ns1Invocations) { + thread.join(); + } + assertEquals("{\"ns0\":5,\"ns1\":4}", + routerContext.getRouterRpcServer().getRPCMetrics().getProxyOpPermitAcceptedPerNs()); + assertEquals("{\"ns0\":3,\"ns1\":2}", + routerContext.getRouterRpcServer().getRPCMetrics().getProxyOpPermitRejectedPerNs()); + } - // Acquiring a permit on any ns now will fail due to overload - assertFalse(controller.acquirePermit(CONCURRENT_NS)); - assertFalse(controller.acquirePermit("ns0")); + private List makeDummyInvocations(RouterRpcClient client, final int nThreads, + final String namespace) { + RemoteMethod dummyMethod = Mockito.mock(RemoteMethod.class); + List threadAcquirePermits = new ArrayList<>(); + for (int i = 0; i < nThreads; i++) { + Thread threadAcquirePermit = new Thread(() -> { + try { + client.invokeSingle(namespace, dummyMethod); + } catch (IOException e) { + e.printStackTrace(); + } + }); + threadAcquirePermits.add(threadAcquirePermit); + threadAcquirePermit.start(); + } + return threadAcquirePermits; } } From 34e207fc65f1ba363f3b7322482cc23428aded98 Mon Sep 17 00:00:00 2001 From: Nguyen Cong Thanh Date: Thu, 21 Apr 2022 19:04:27 +0800 Subject: [PATCH 5/6] Fix checkstyle --- .../fairness/TestRouterRefreshFairnessPolicyController.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java index 75c2c12449065..dfda47b9a53f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java @@ -100,8 +100,8 @@ public void testRefreshNonexistentHandlerClass() { @Test public void testRefreshClassDoesNotImplementControllerInterface() { MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter(); - routerContext.getConf().set(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS, - "org.apache.hadoop.hdfs.server.federation.fairness.TestRouterRefreshFairnessPolicyController"); + routerContext.getConf() + .set(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS, "java.lang.String"); assertEquals(StaticRouterRpcFairnessPolicyController.class.getCanonicalName(), routerContext.getRouterRpcClient() .refreshFairnessPolicyController(routerContext.getConf())); From ee8036bf07a1e8505e2f5652683516f23f613d8e Mon Sep 17 00:00:00 2001 From: Nguyen Cong Thanh Date: Tue, 26 Apr 2022 11:23:22 +0800 Subject: [PATCH 6/6] Fix checkstyle --- .../hadoop/hdfs/server/federation/router/RouterRpcServer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 0287c9b3d3339..69f300bfb7d35 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -537,7 +537,7 @@ public FileSubclusterResolver getSubclusterResolver() { } /** - * Get the active namenode resolver + * Get the active namenode resolver. * * @return Active namenode resolver. */