Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public class MembershipNamenodeResolver
* name and a boolean indicating if observer namenodes should be listed first.
* If true, observer namenodes are listed first. If false, active namenodes are listed first.
* Invalidated on cache refresh. */
private Map<Pair<String,Boolean>, List<? extends FederationNamenodeContext>> cacheNS;
private Map<Pair<String, Boolean>, List<? extends FederationNamenodeContext>> cacheNS;
/** Cached lookup of NN for block pool. Invalidated on cache refresh. */
private Map<String, List<? extends FederationNamenodeContext>> cacheBP;

Expand Down Expand Up @@ -483,9 +483,9 @@ public void setRouterId(String router) {
* Rotate cache, make the current namenode have the lowest priority,
* to ensure that the current namenode will not be accessed first next time.
*
* @param nsId name service id
* @param namenode namenode contexts
* @param listObserversFirst Observer read case, observer NN will be ranked first
* @param nsId name service id.
* @param namenode namenode contexts.
* @param listObserversFirst Observer read case, observer NN will be ranked first.
*/
@Override
public void rotateCache(
Expand All @@ -494,29 +494,32 @@ public void rotateCache(
if (namenodeContexts == null || namenodeContexts.size() <= 1) {
return namenodeContexts;
}
FederationNamenodeContext firstNamenodeContext = namenodeContexts.get(0);
/*
* If the first nn in the cache is active, the active nn priority cannot be lowered.
* This happens when other threads have already updated the cache.
*/
if (firstNamenodeContext.getState().equals(ACTIVE)) {
return namenodeContexts;

// If there is active nn, rotateCache is not needed
// because the router has already loaded the cache.
for (FederationNamenodeContext namenodeContext : namenodeContexts) {
if (namenodeContext.getState() == ACTIVE) {
return namenodeContexts;
}
}
/*
* If the first nn in the cache at this time is not the nn
* that needs to be lowered in priority, there is no need to rotate.
* This happens when other threads have already rotated the cache.
*/
if (firstNamenodeContext.getRpcAddress().equals(namenode.getRpcAddress())) {
List<FederationNamenodeContext> rotatedNnContexts = new ArrayList<>(namenodeContexts);
Collections.rotate(rotatedNnContexts, -1);
String firstNamenodeId = namenodeContexts.get(0).getNamenodeId();
LOG.info("Rotate cache of pair <ns: {}, observer first: {}>, put namenode: {} in the " +
"first position of the cache and namenode: {} in the last position of the cache",
nsId, listObserversFirst, firstNamenodeId, namenode.getNamenodeId());
return rotatedNnContexts;

// If the last namenode in the cache at this time
// is the namenode whose priority needs to be lowered.
// No need to rotate cache, because other threads have already rotated the cache.
FederationNamenodeContext lastNamenode = namenodeContexts.get(namenodeContexts.size()-1);
if (lastNamenode.getRpcAddress().equals(namenode.getRpcAddress())) {
return namenodeContexts;
}
return namenodeContexts;

// Move the inaccessible namenode to the end of the cache,
// to ensure that the namenode will not be accessed first next time.
List<FederationNamenodeContext> rotateNamenodeContexts =
(List<FederationNamenodeContext>) namenodeContexts;
rotateNamenodeContexts.remove(namenode);
rotateNamenodeContexts.add(namenode);
LOG.info("Rotate cache of pair<{}, {}> -> {}",
nsId, listObserversFirst, rotateNamenodeContexts);
return rotateNamenodeContexts;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -457,14 +457,17 @@ private static IOException toIOException(Exception e) {
* @param ioe IOException reported.
* @param retryCount Number of retries.
* @param nsId Nameservice ID.
* @param namenode namenode context.
* @param listObserverFirst Observer read case, observer NN will be ranked first.
* @return Retry decision.
* @throws NoNamenodesAvailableException Exception that the retry policy
* generates for no available namenodes.
* @throws IOException An IO Error occurred.
*/
private RetryDecision shouldRetry(final IOException ioe, final int retryCount,
final String nsId) throws IOException {
private RetryDecision shouldRetry(
final IOException ioe, final int retryCount, final String nsId,
final FederationNamenodeContext namenode,
final boolean listObserverFirst) throws IOException {
// check for the case of cluster unavailable state
if (isClusterUnAvailable(nsId)) {
if (isClusterUnAvailable(nsId, namenode, listObserverFirst)) {
// we allow to retry once if cluster is unavailable
if (retryCount == 0) {
return RetryDecision.RETRY;
Expand Down Expand Up @@ -538,7 +541,7 @@ public Object invokeMethod(
ProxyAndInfo<?> client = connection.getClient();
final Object proxy = client.getProxy();

ret = invoke(nsId, 0, method, proxy, params);
ret = invoke(nsId, namenode, useObserver, 0, method, proxy, params);
if (failover &&
FederationNamenodeServiceState.OBSERVER != namenode.getState()) {
// Success on alternate server, update
Expand Down Expand Up @@ -594,13 +597,16 @@ public Object invokeMethod(
se.initCause(ioe);
throw se;
} else if (ioe instanceof NoNamenodesAvailableException) {
IOException cause = (IOException) ioe.getCause();
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpNoNamenodes(nsId);
}
LOG.error("Cannot get available namenode for {} {} error: {}",
nsId, rpcAddress, ioe.getMessage());
// Rotate cache so that client can retry the next namenode in the cache
this.namenodeResolver.rotateCache(nsId, namenode, shouldUseObserver);
if (shouldRotateCache(cause)) {
this.namenodeResolver.rotateCache(nsId, namenode, useObserver);
}
// Throw RetriableException so that client can retry
throw new RetriableException(ioe);
} else {
Expand Down Expand Up @@ -708,7 +714,9 @@ private void addClientInfoToCallerContext(UserGroupInformation ugi) {
* @return Response from the remote server
* @throws IOException If error occurs.
*/
private Object invoke(String nsId, int retryCount, final Method method,
private Object invoke(
String nsId, FederationNamenodeContext namenode, Boolean listObserverFirst,
int retryCount, final Method method,
final Object obj, final Object... params) throws IOException {
try {
return method.invoke(obj, params);
Expand All @@ -721,14 +729,14 @@ private Object invoke(String nsId, int retryCount, final Method method,
IOException ioe = (IOException) cause;

// Check if we should retry.
RetryDecision decision = shouldRetry(ioe, retryCount, nsId);
RetryDecision decision = shouldRetry(ioe, retryCount, nsId, namenode, listObserverFirst);
if (decision == RetryDecision.RETRY) {
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpRetries();
}

// retry
return invoke(nsId, ++retryCount, method, obj, params);
return invoke(nsId, namenode, listObserverFirst, ++retryCount, method, obj, params);
} else if (decision == RetryDecision.FAILOVER_AND_RETRY) {
// failover, invoker looks for standby exceptions for failover.
if (ioe instanceof StandbyException) {
Expand Down Expand Up @@ -772,13 +780,23 @@ public static boolean isUnavailableException(IOException ioe) {
* Check if the cluster of given nameservice id is available.
*
* @param nsId nameservice ID.
* @param namenode namenode context.
* @param listObserverFirst Observer read case, observer NN will be ranked first.
* @return true if the cluster with given nameservice id is available.
* @throws IOException if error occurs.
*/
private boolean isClusterUnAvailable(String nsId) throws IOException {
private boolean isClusterUnAvailable(
String nsId, FederationNamenodeContext namenode,
boolean listObserverFirst) throws IOException {
// If the operation is an observer read
// and the namenode that caused the exception is an observer,
// false is returned so that the observer can be marked as unavailable,so other observers
// or active namenode which is standby in the cache of the router can be retried.
if (listObserverFirst && namenode.getState() == FederationNamenodeServiceState.OBSERVER) {
return false;
}
List<? extends FederationNamenodeContext> nnState = this.namenodeResolver
.getNamenodesForNameserviceId(nsId, false);

.getNamenodesForNameserviceId(nsId, listObserverFirst);
if (nnState != null) {
for (FederationNamenodeContext nnContext : nnState) {
// Once we find one NN is in active state, we assume this
Expand Down Expand Up @@ -1830,4 +1848,24 @@ private LongAccumulator getTimeOfLastCallToActive(String namespaceId) {
return lastActiveNNRefreshTimes
.computeIfAbsent(namespaceId, key -> new LongAccumulator(Math::max, 0));
}

/**
* Determine whether router rotated cache is required when NoNamenodesAvailableException occurs.
*
* @param ioe cause of the NoNamenodesAvailableException.
* @return true if NoNamenodesAvailableException occurs due to
* {@link RouterRpcClient#isUnavailableException(IOException) unavailable exception},
* otherwise false.
*/
private boolean shouldRotateCache(IOException ioe) {
if (isUnavailableException(ioe)) {
return true;
}
if (ioe instanceof RemoteException) {
RemoteException re = (RemoteException) ioe;
ioe = re.unwrapRemoteException();
ioe = getCleanException(ioe);
}
return isUnavailableException(ioe);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ public class MiniRouterDFSCluster {
/** Mini cluster. */
private MiniDFSCluster cluster;

protected static final long DEFAULT_HEARTBEAT_INTERVAL_MS =
public static final long DEFAULT_HEARTBEAT_INTERVAL_MS =
TimeUnit.SECONDS.toMillis(5);
protected static final long DEFAULT_CACHE_INTERVAL_MS =
public static final long DEFAULT_CACHE_INTERVAL_MS =
TimeUnit.SECONDS.toMillis(5);
/** Heartbeat interval in milliseconds. */
private long heartbeatInterval;
Expand Down Expand Up @@ -240,17 +240,26 @@ public FileSystem getFileSystem(Configuration configuration) throws IOException
}

public FileSystem getFileSystemWithObserverReadProxyProvider() throws IOException {
Configuration observerReadConf = new Configuration(conf);
observerReadConf.set(DFS_NAMESERVICES,
observerReadConf.get(DFS_NAMESERVICES)+ ",router-service");
observerReadConf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".router-service", "router1");
observerReadConf.set(DFS_NAMENODE_RPC_ADDRESS_KEY+ ".router-service.router1",
return getFileSystemWithProxyProvider(ObserverReadProxyProvider.class.getName());
}

public FileSystem getFileSystemWithConfiguredFailoverProxyProvider() throws IOException {
return getFileSystemWithProxyProvider(ConfiguredFailoverProxyProvider.class.getName());
}

private FileSystem getFileSystemWithProxyProvider(
String proxyProviderClassName) throws IOException {
conf.set(DFS_NAMESERVICES,
conf.get(DFS_NAMESERVICES)+ ",router-service");
conf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".router-service", "router1");
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY+ ".router-service.router1",
getFileSystemURI().toString());
observerReadConf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX
+ "." + "router-service", ObserverReadProxyProvider.class.getName());
DistributedFileSystem.setDefaultUri(observerReadConf, "hdfs://router-service");

return DistributedFileSystem.get(observerReadConf);
conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX
+ "." + "router-service", proxyProviderClassName);
DistributedFileSystem.setDefaultUri(conf, "hdfs://router-service");

return DistributedFileSystem.get(conf);
}

public DFSClient getClient(UserGroupInformation user)
Expand Down
Loading