Browse Source

YARN-11359. [Federation] Routing admin invocations transparently to multiple RMs. (#5057)

slfan1989 2 years ago
parent
commit
eccd2d0492
10 changed files with 899 additions and 161 deletions
  1. 76 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationMethodWrapper.java
  2. 33 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
  3. 46 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
  4. 2 28
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java
  5. 8 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/AbstractRMAdminRequestInterceptor.java
  6. 283 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java
  7. 132 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RMAdminProtocolMethod.java
  8. 99 130
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/BaseRouterRMAdminTest.java
  9. 121 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java
  10. 99 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestableFederationRMAdminInterceptor.java

+ 76 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationMethodWrapper.java

@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.utils;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+
+public abstract class FederationMethodWrapper {
+
+  /**
+   * List of parameters: static and dynamic values, matchings types.
+   */
+  private Object[] params;
+
+  /**
+   * List of method parameters types, matches parameters.
+   */
+  private Class<?>[] types;
+
+  /**
+   * String name of the method.
+   */
+  private String methodName;
+
+  public FederationMethodWrapper(Class<?>[] pTypes, Object... pParams)
+      throws IOException {
+    if (pParams.length != pTypes.length) {
+      throw new IOException("Invalid parameters for method.");
+    }
+    this.params = pParams;
+    this.types = Arrays.copyOf(pTypes, pTypes.length);
+  }
+
+  public Object[] getParams() {
+    return Arrays.copyOf(this.params, this.params.length);
+  }
+
+  public String getMethodName() {
+    return methodName;
+  }
+
+  public void setMethodName(String methodName) {
+    this.methodName = methodName;
+  }
+
+  /**
+   * Get the calling types for this method.
+   *
+   * @return An array of calling types.
+   */
+  public Class<?>[] getTypes() {
+    return Arrays.copyOf(this.types, this.types.length);
+  }
+
+  protected abstract <R> Collection<R> invokeConcurrent(Class<R> clazz) throws YarnException;
+}

+ 33 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java

@@ -121,6 +121,8 @@ public final class RouterMetrics {
   private MutableGaugeInt numGetAppTimeoutFailedRetrieved;
   @Metric("# of getAppTimeouts failed to be retrieved")
   private MutableGaugeInt numGetAppTimeoutsFailedRetrieved;
+  @Metric("# of refreshQueues failed to be retrieved")
+  private MutableGaugeInt numRefreshQueuesFailedRetrieved;
   @Metric("# of getRMNodeLabels failed to be retrieved")
   private MutableGaugeInt numGetRMNodeLabelsFailedRetrieved;
   @Metric("# of checkUserAccessToQueue failed to be retrieved")
@@ -207,6 +209,8 @@ public final class RouterMetrics {
   private MutableRate totalSucceededGetAppTimeoutRetrieved;
   @Metric("Total number of successful Retrieved GetAppTimeouts and latency(ms)")
   private MutableRate totalSucceededGetAppTimeoutsRetrieved;
+  @Metric("Total number of successful Retrieved RefreshQueues and latency(ms)")
+  private MutableRate totalSucceededRefreshQueuesRetrieved;
   @Metric("Total number of successful Retrieved GetRMNodeLabels and latency(ms)")
   private MutableRate totalSucceededGetRMNodeLabelsRetrieved;
   @Metric("Total number of successful Retrieved CheckUserAccessToQueue and latency(ms)")
@@ -255,6 +259,7 @@ public final class RouterMetrics {
   private MutableQuantiles getUpdateQueueLatency;
   private MutableQuantiles getAppTimeoutLatency;
   private MutableQuantiles getAppTimeoutsLatency;
+  private MutableQuantiles getRefreshQueuesLatency;
   private MutableQuantiles getRMNodeLabelsLatency;
   private MutableQuantiles checkUserAccessToQueueLatency;
 
@@ -410,6 +415,9 @@ public final class RouterMetrics {
     getAppTimeoutsLatency = registry.newQuantiles("getAppTimeoutsLatency",
          "latency of get apptimeouts timeouts", "ops", "latency", 10);
 
+    getRefreshQueuesLatency = registry.newQuantiles("getRefreshQueuesLatency",
+         "latency of get refresh queues timeouts", "ops", "latency", 10);
+
     getRMNodeLabelsLatency = registry.newQuantiles("getRMNodeLabelsLatency",
         "latency of get rmnodelabels timeouts", "ops", "latency", 10);
 
@@ -636,6 +644,11 @@ public final class RouterMetrics {
     return totalSucceededGetAppTimeoutsRetrieved.lastStat().numSamples();
   }
 
+  @VisibleForTesting
+  public long getNumSucceededRefreshQueuesRetrieved() {
+    return totalSucceededRefreshQueuesRetrieved.lastStat().numSamples();
+  }
+
   @VisibleForTesting
   public long getNumSucceededGetRMNodeLabelsRetrieved() {
     return totalSucceededGetRMNodeLabelsRetrieved.lastStat().numSamples();
@@ -846,6 +859,11 @@ public final class RouterMetrics {
     return totalSucceededGetAppTimeoutsRetrieved.lastStat().mean();
   }
 
+  @VisibleForTesting
+  public double getLatencySucceededRefreshQueuesRetrieved() {
+    return totalSucceededRefreshQueuesRetrieved.lastStat().mean();
+  }
+
   @VisibleForTesting
   public double getLatencySucceededGetRMNodeLabelsRetrieved() {
     return totalSucceededGetRMNodeLabelsRetrieved.lastStat().mean();
@@ -1037,6 +1055,11 @@ public final class RouterMetrics {
     return numGetAppTimeoutsFailedRetrieved.value();
   }
 
+
+  public int getRefreshQueuesFailedRetrieved() {
+    return numRefreshQueuesFailedRetrieved.value();
+  }
+
   public int getRMNodeLabelsFailedRetrieved() {
     return numGetRMNodeLabelsFailedRetrieved.value();
   }
@@ -1245,6 +1268,11 @@ public final class RouterMetrics {
     getAppTimeoutsLatency.add(duration);
   }
 
+  public void succeededRefreshQueuesRetrieved(long duration) {
+    totalSucceededRefreshQueuesRetrieved.add(duration);
+    getRefreshQueuesLatency.add(duration);
+  }
+
   public void succeededGetRMNodeLabelsRetrieved(long duration) {
     totalSucceededGetRMNodeLabelsRetrieved.add(duration);
     getRMNodeLabelsLatency.add(duration);
@@ -1415,6 +1443,10 @@ public final class RouterMetrics {
     numGetAppTimeoutsFailedRetrieved.incr();
   }
 
+  public void incrRefreshQueuesFailedRetrieved() {
+    numRefreshQueuesFailedRetrieved.incr();
+  }
+
   public void incrGetRMNodeLabelsFailedRetrieved() {
     numGetRMNodeLabelsFailedRetrieved.incr();
   }
@@ -1422,4 +1454,4 @@ public final class RouterMetrics {
   public void incrCheckUserAccessToQueueFailedRetrieved() {
     numCheckUserAccessToQueueFailedRetrieved.incr();
   }
-}
+}

+ 46 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -299,6 +300,28 @@ public final class RouterServerUtil {
     return logAndReturnRunTimeException(null, errMsgFormat, args);
   }
 
+  /**
+   * Throws an YarnRuntimeException due to an error.
+   *
+   * @param t the throwable raised in the called class.
+   * @param errMsgFormat the error message format string.
+   * @param args referenced by the format specifiers in the format string.
+   * @return YarnRuntimeException
+   */
+  @Public
+  @Unstable
+  public static YarnRuntimeException logAndReturnYarnRunTimeException(
+      Throwable t, String errMsgFormat, Object... args) {
+    String msg = String.format(errMsgFormat, args);
+    if (t != null) {
+      LOG.error(msg, t);
+      return new YarnRuntimeException(msg, t);
+    } else {
+      LOG.error(msg);
+      return new YarnRuntimeException(msg);
+    }
+  }
+
   /**
    * Check applicationId is accurate.
    *
@@ -491,4 +514,27 @@ public final class RouterServerUtil {
     // Randomly choose a SubCluster
     return subClusterIds.get(rand.nextInt(subClusterIds.size()));
   }
+
+  public static UserGroupInformation setupUser(final String userName) {
+    UserGroupInformation user = null;
+    try {
+      // If userName is empty, we will return UserGroupInformation.getCurrentUser.
+      // Do not create a proxy user if user name matches the user name on
+      // current UGI
+      if (userName == null || userName.trim().isEmpty()) {
+        user = UserGroupInformation.getCurrentUser();
+      } else if (UserGroupInformation.isSecurityEnabled()) {
+        user = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser());
+      } else if (userName.equalsIgnoreCase(UserGroupInformation.getCurrentUser().getUserName())) {
+        user = UserGroupInformation.getCurrentUser();
+      } else {
+        user = UserGroupInformation.createProxyUser(userName,
+            UserGroupInformation.getCurrentUser());
+      }
+      return user;
+    } catch (IOException e) {
+      throw RouterServerUtil.logAndReturnYarnRunTimeException(e,
+          "Error while creating Router RMAdmin Service for user : %s.", user);
+    }
+  }
 }

+ 2 - 28
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java

@@ -18,11 +18,9 @@
 
 package org.apache.hadoop.yarn.server.router.clientrm;
 
-import java.io.IOException;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.router.RouterServerUtil;
 import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,7 +78,7 @@ public abstract class AbstractClientRequestInterceptor
    */
   @Override
   public void init(String userName) {
-    setupUser(userName);
+    this.user = RouterServerUtil.setupUser(userName);
     if (this.nextInterceptor != null) {
       this.nextInterceptor.init(userName);
     }
@@ -104,30 +102,6 @@ public abstract class AbstractClientRequestInterceptor
     return this.nextInterceptor;
   }
 
-  private void setupUser(String userName) {
-
-    try {
-      // Do not create a proxy user if user name matches the user name on
-      // current UGI
-      if (UserGroupInformation.isSecurityEnabled()) {
-        user = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser());
-      } else if (userName.equalsIgnoreCase(UserGroupInformation.getCurrentUser().getUserName())) {
-        user = UserGroupInformation.getCurrentUser();
-      } else {
-        user = UserGroupInformation.createProxyUser(userName,
-            UserGroupInformation.getCurrentUser());
-      }
-    } catch (IOException e) {
-      String message = "Error while creating Router ClientRM Service for user:";
-      if (user != null) {
-        message += ", user: " + user;
-      }
-
-      LOG.info(message);
-      throw new YarnRuntimeException(message, e);
-    }
-  }
-
   @Override
   public RouterDelegationTokenSecretManager getTokenSecretManager() {
     return tokenSecretManager;

+ 8 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/AbstractRMAdminRequestInterceptor.java

@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.router.rmadmin;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.server.router.RouterServerUtil;
 
 /**
  * Implements the {@link RMAdminRequestInterceptor} interface and provides
@@ -31,6 +33,9 @@ public abstract class AbstractRMAdminRequestInterceptor
   private Configuration conf;
   private RMAdminRequestInterceptor nextInterceptor;
 
+  @SuppressWarnings("checkstyle:visibilitymodifier")
+  protected UserGroupInformation user = null;
+
   /**
    * Sets the {@link RMAdminRequestInterceptor} in the chain.
    */
@@ -63,9 +68,10 @@ public abstract class AbstractRMAdminRequestInterceptor
    * Initializes the {@link RMAdminRequestInterceptor}.
    */
   @Override
-  public void init(String user) {
+  public void init(String userName) {
+    this.user = RouterServerUtil.setupUser(userName);
     if (this.nextInterceptor != null) {
-      this.nextInterceptor.init(user);
+      this.nextInterceptor.init(userName);
     }
   }
 

+ 283 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java

@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.rmadmin;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingResponse;
+import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.router.RouterMetrics;
+import org.apache.hadoop.yarn.server.router.RouterServerUtil;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.MonotonicClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Collection;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class FederationRMAdminInterceptor extends AbstractRMAdminRequestInterceptor {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(FederationRMAdminInterceptor.class);
+
+  private Map<SubClusterId, ResourceManagerAdministrationProtocol> adminRMProxies;
+  private FederationStateStoreFacade federationFacade;
+  private final Clock clock = new MonotonicClock();
+  private RouterMetrics routerMetrics;
+  private ThreadPoolExecutor executorService;
+  private Configuration conf;
+
+  @Override
+  public void init(String userName) {
+    super.init(userName);
+
+    int numThreads = getConf().getInt(
+        YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE,
+        YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREADS_SIZE);
+    ThreadFactory threadFactory = new ThreadFactoryBuilder()
+        .setNameFormat("RPC Router RMAdminClient-" + userName + "-%d ").build();
+
+    BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
+    this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
+        0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);
+
+    federationFacade = FederationStateStoreFacade.getInstance();
+    this.conf = this.getConf();
+    this.adminRMProxies = new ConcurrentHashMap<>();
+    routerMetrics = RouterMetrics.getMetrics();
+  }
+
+  @VisibleForTesting
+  protected ResourceManagerAdministrationProtocol getAdminRMProxyForSubCluster(
+      SubClusterId subClusterId) throws YarnException {
+
+    if (adminRMProxies.containsKey(subClusterId)) {
+      return adminRMProxies.get(subClusterId);
+    }
+
+    ResourceManagerAdministrationProtocol adminRMProxy = null;
+    try {
+      boolean serviceAuthEnabled = this.conf.getBoolean(
+          CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false);
+      UserGroupInformation realUser = user;
+      if (serviceAuthEnabled) {
+        realUser = UserGroupInformation.createProxyUser(
+            user.getShortUserName(), UserGroupInformation.getLoginUser());
+      }
+      adminRMProxy = FederationProxyProviderUtil.createRMProxy(getConf(),
+          ResourceManagerAdministrationProtocol.class, subClusterId, realUser);
+    } catch (Exception e) {
+      RouterServerUtil.logAndThrowException(e,
+          "Unable to create the interface to reach the SubCluster %s", subClusterId);
+    }
+    adminRMProxies.put(subClusterId, adminRMProxy);
+    return adminRMProxy;
+  }
+
+  @Override
+  public void setNextInterceptor(RMAdminRequestInterceptor next) {
+    throw new YarnRuntimeException("setNextInterceptor is being called on "
+       + "FederationRMAdminRequestInterceptor, which should be the last one "
+       + "in the chain. Check if the interceptor pipeline configuration "
+       + "is correct");
+  }
+
+  @Override
+  public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
+      throws StandbyException, YarnException, IOException {
+
+    // parameter verification.
+    if (request == null) {
+      routerMetrics.incrRefreshQueuesFailedRetrieved();
+      RouterServerUtil.logAndThrowException("Missing RefreshQueues request.", null);
+    }
+
+    // call refreshQueues of activeSubClusters.
+    try {
+      long startTime = clock.getTime();
+      RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
+           new Class[] {RefreshQueuesRequest.class}, new Object[] {request});
+
+      Collection<RefreshQueuesResponse> refreshQueueResps =
+          remoteMethod.invokeConcurrent(this, RefreshQueuesResponse.class);
+
+      // If we get the return result from refreshQueueResps,
+      // it means that the call has been successful,
+      // and the RefreshQueuesResponse method can be reconstructed and returned.
+      if (CollectionUtils.isNotEmpty(refreshQueueResps)) {
+        long stopTime = clock.getTime();
+        routerMetrics.succeededRefreshQueuesRetrieved(stopTime - startTime);
+        return RefreshQueuesResponse.newInstance();
+      }
+    } catch (Exception e) {
+      routerMetrics.incrRefreshQueuesFailedRetrieved();
+      RouterServerUtil.logAndThrowException("Unable to refreshQueue to exception.", e);
+    }
+
+    routerMetrics.incrRefreshQueuesFailedRetrieved();
+    throw new YarnException("Unable to refreshQueue.");
+  }
+
+  @Override
+  public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
+      throws StandbyException, YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
+      RefreshSuperUserGroupsConfigurationRequest request)
+      throws StandbyException, YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
+      RefreshUserToGroupsMappingsRequest request)
+      throws StandbyException, YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public RefreshAdminAclsResponse refreshAdminAcls(RefreshAdminAclsRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public RefreshServiceAclsResponse refreshServiceAcls(RefreshServiceAclsRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public UpdateNodeResourceResponse updateNodeResource(UpdateNodeResourceRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public RefreshNodesResourcesResponse refreshNodesResources(RefreshNodesResourcesRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
+      AddToClusterNodeLabelsRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
+      RemoveFromClusterNodeLabelsRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(ReplaceLabelsOnNodeRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
+      CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
+      RefreshClusterMaxPriorityRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public NodesToAttributesMappingResponse mapAttributesToNodes(
+      NodesToAttributesMappingRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public String[] getGroupsForUser(String user) throws IOException {
+    return new String[0];
+  }
+
+  @VisibleForTesting
+  public FederationStateStoreFacade getFederationFacade() {
+    return federationFacade;
+  }
+
+  @VisibleForTesting
+  public ThreadPoolExecutor getExecutorService() {
+    return executorService;
+  }
+}

+ 132 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RMAdminProtocolMethod.java

@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router.rmadmin;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.utils.FederationMethodWrapper;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+
+
+/**
+ * Class to define admin method, params and arguments.
+ */
+public class RMAdminProtocolMethod extends FederationMethodWrapper {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RMAdminProtocolMethod.class);
+
+  private FederationStateStoreFacade federationFacade;
+  private FederationRMAdminInterceptor rmAdminInterceptor;
+  private Configuration configuration;
+
+  public RMAdminProtocolMethod(Class<?>[] pTypes, Object... pParams)
+      throws IOException {
+    super(pTypes, pParams);
+  }
+
+  public <R> Collection<R> invokeConcurrent(FederationRMAdminInterceptor interceptor,
+      Class<R> clazz) throws YarnException {
+    this.rmAdminInterceptor = interceptor;
+    this.federationFacade = FederationStateStoreFacade.getInstance();
+    this.configuration = interceptor.getConf();
+    return invokeConcurrent(clazz);
+  }
+
+  @Override
+  protected <R> Collection<R> invokeConcurrent(Class<R> clazz) throws YarnException {
+    String methodName = Thread.currentThread().getStackTrace()[3].getMethodName();
+    this.setMethodName(methodName);
+
+    ThreadPoolExecutor executorService = rmAdminInterceptor.getExecutorService();
+
+    // Get Active SubClusters
+    Map<SubClusterId, SubClusterInfo> subClusterInfo =
+        federationFacade.getSubClusters(true);
+    Collection<SubClusterId> subClusterIds = subClusterInfo.keySet();
+
+    List<Callable<Pair<SubClusterId, Object>>> callables = new ArrayList<>();
+    List<Future<Pair<SubClusterId, Object>>> futures = new ArrayList<>();
+    Map<SubClusterId, Exception> exceptions = new TreeMap<>();
+
+    // Generate parallel Callable tasks
+    for (SubClusterId subClusterId : subClusterIds) {
+      callables.add(() -> {
+        ResourceManagerAdministrationProtocol protocol =
+            rmAdminInterceptor.getAdminRMProxyForSubCluster(subClusterId);
+        Class<?>[] types = this.getTypes();
+        Object[] params = this.getParams();
+        Method method = ResourceManagerAdministrationProtocol.class.getMethod(methodName, types);
+        Object result = method.invoke(protocol, params);
+        return Pair.of(subClusterId, result);
+      });
+    }
+
+    // Get results from multiple threads
+    Map<SubClusterId, R> results = new TreeMap<>();
+    try {
+      futures.addAll(executorService.invokeAll(callables));
+      futures.stream().forEach(future -> {
+        SubClusterId subClusterId = null;
+        try {
+          Pair<SubClusterId, Object> pair = future.get();
+          subClusterId = pair.getKey();
+          Object result = pair.getValue();
+          results.put(subClusterId, clazz.cast(result));
+        } catch (InterruptedException | ExecutionException e) {
+          Throwable cause = e.getCause();
+          LOG.error("Cannot execute {} on {}: {}", methodName, subClusterId, cause.getMessage());
+          exceptions.put(subClusterId, e);
+        }
+      });
+    } catch (InterruptedException e) {
+      throw new YarnException("invokeConcurrent Failed.", e);
+    }
+
+    // All sub-clusters return results to be considered successful,
+    // otherwise an exception will be thrown.
+    if (exceptions != null && !exceptions.isEmpty()) {
+      Set<SubClusterId> subClusterIdSets = exceptions.keySet();
+      throw new YarnException("invokeConcurrent Failed, An exception occurred in subClusterIds = " +
+          StringUtils.join(subClusterIdSets, ","));
+    }
+
+    // return result
+    return results.values();
+  }
+}

+ 99 - 130
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/BaseRouterRMAdminTest.java

@@ -88,27 +88,36 @@ public abstract class BaseRouterRMAdminTest {
 
   @Before
   public void setUp() {
-    this.conf = new YarnConfiguration();
+    this.conf = createConfiguration();
+    this.dispatcher = new AsyncDispatcher();
+    this.dispatcher.init(conf);
+    this.dispatcher.start();
+    this.rmAdminService = createAndStartRouterRMAdminService();
+    DefaultMetricsSystem.setMiniClusterMode(true);
+  }
+
+  protected Configuration getConf() {
+    return this.conf;
+  }
+
+  public void setUpConfig() {
+    this.conf = createConfiguration();
+  }
+
+  protected Configuration createConfiguration() {
+    YarnConfiguration config = new YarnConfiguration();
     String mockPassThroughInterceptorClass =
         PassThroughRMAdminRequestInterceptor.class.getName();
 
     // Create a request interceptor pipeline for testing. The last one in the
     // chain will call the mock resource manager. The others in the chain will
     // simply forward it to the next one in the chain
-    this.conf.set(YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE,
-        mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
-            + "," + mockPassThroughInterceptorClass + ","
-            + MockRMAdminRequestInterceptor.class.getName());
-
-    this.conf.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
-        TEST_MAX_CACHE_SIZE);
+    config.set(YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE,
+        mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + "," +
+        mockPassThroughInterceptorClass + "," + MockRMAdminRequestInterceptor.class.getName());
 
-    this.dispatcher = new AsyncDispatcher();
-    this.dispatcher.init(conf);
-    this.dispatcher.start();
-    this.rmAdminService = createAndStartRouterRMAdminService();
-
-    DefaultMetricsSystem.setMiniClusterMode(true);
+    config.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE, TEST_MAX_CACHE_SIZE);
+    return config;
   }
 
   @After
@@ -142,194 +151,154 @@ public abstract class BaseRouterRMAdminTest {
   protected RefreshQueuesResponse refreshQueues(String user)
       throws IOException, InterruptedException {
     return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<RefreshQueuesResponse>() {
-          @Override
-          public RefreshQueuesResponse run() throws Exception {
-            RefreshQueuesRequest req = RefreshQueuesRequest.newInstance();
-            RefreshQueuesResponse response =
-                getRouterRMAdminService().refreshQueues(req);
-            return response;
-          }
+        .doAs((PrivilegedExceptionAction<RefreshQueuesResponse>) () -> {
+          RefreshQueuesRequest req = RefreshQueuesRequest.newInstance();
+          RefreshQueuesResponse response =
+              getRouterRMAdminService().refreshQueues(req);
+          return response;
         });
   }
 
   protected RefreshNodesResponse refreshNodes(String user)
       throws IOException, InterruptedException {
     return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<RefreshNodesResponse>() {
-          @Override
-          public RefreshNodesResponse run() throws Exception {
-            RefreshNodesRequest req = RefreshNodesRequest.newInstance();
-            RefreshNodesResponse response =
-                getRouterRMAdminService().refreshNodes(req);
-            return response;
-          }
+        .doAs((PrivilegedExceptionAction<RefreshNodesResponse>) () -> {
+          RefreshNodesRequest req = RefreshNodesRequest.newInstance();
+          RefreshNodesResponse response =
+              getRouterRMAdminService().refreshNodes(req);
+          return response;
         });
   }
 
   protected RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
       String user) throws IOException, InterruptedException {
     return UserGroupInformation.createRemoteUser(user).doAs(
-        new PrivilegedExceptionAction<RefreshSuperUserGroupsConfigurationResponse>() {
-          @Override
-          public RefreshSuperUserGroupsConfigurationResponse run()
-              throws Exception {
-            RefreshSuperUserGroupsConfigurationRequest req =
-                RefreshSuperUserGroupsConfigurationRequest.newInstance();
-            RefreshSuperUserGroupsConfigurationResponse response =
-                getRouterRMAdminService()
-                    .refreshSuperUserGroupsConfiguration(req);
-            return response;
-          }
+        (PrivilegedExceptionAction<RefreshSuperUserGroupsConfigurationResponse>) () -> {
+          RefreshSuperUserGroupsConfigurationRequest req =
+              RefreshSuperUserGroupsConfigurationRequest.newInstance();
+          RefreshSuperUserGroupsConfigurationResponse response =
+              getRouterRMAdminService()
+                  .refreshSuperUserGroupsConfiguration(req);
+          return response;
         });
   }
 
   protected RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
       String user) throws IOException, InterruptedException {
     return UserGroupInformation.createRemoteUser(user).doAs(
-        new PrivilegedExceptionAction<RefreshUserToGroupsMappingsResponse>() {
-          @Override
-          public RefreshUserToGroupsMappingsResponse run() throws Exception {
-            RefreshUserToGroupsMappingsRequest req =
-                RefreshUserToGroupsMappingsRequest.newInstance();
-            RefreshUserToGroupsMappingsResponse response =
-                getRouterRMAdminService().refreshUserToGroupsMappings(req);
-            return response;
-          }
+        (PrivilegedExceptionAction<RefreshUserToGroupsMappingsResponse>) () -> {
+          RefreshUserToGroupsMappingsRequest req =
+              RefreshUserToGroupsMappingsRequest.newInstance();
+          RefreshUserToGroupsMappingsResponse response =
+              getRouterRMAdminService().refreshUserToGroupsMappings(req);
+          return response;
         });
   }
 
   protected RefreshAdminAclsResponse refreshAdminAcls(String user)
       throws IOException, InterruptedException {
     return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<RefreshAdminAclsResponse>() {
-          @Override
-          public RefreshAdminAclsResponse run() throws Exception {
-            RefreshAdminAclsRequest req = RefreshAdminAclsRequest.newInstance();
-            RefreshAdminAclsResponse response =
-                getRouterRMAdminService().refreshAdminAcls(req);
-            return response;
-          }
+        .doAs((PrivilegedExceptionAction<RefreshAdminAclsResponse>) () -> {
+          RefreshAdminAclsRequest req = RefreshAdminAclsRequest.newInstance();
+          RefreshAdminAclsResponse response =
+              getRouterRMAdminService().refreshAdminAcls(req);
+          return response;
         });
   }
 
   protected RefreshServiceAclsResponse refreshServiceAcls(String user)
       throws IOException, InterruptedException {
     return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<RefreshServiceAclsResponse>() {
-          @Override
-          public RefreshServiceAclsResponse run() throws Exception {
-            RefreshServiceAclsRequest req =
-                RefreshServiceAclsRequest.newInstance();
-            RefreshServiceAclsResponse response =
-                getRouterRMAdminService().refreshServiceAcls(req);
-            return response;
-          }
+        .doAs((PrivilegedExceptionAction<RefreshServiceAclsResponse>) () -> {
+          RefreshServiceAclsRequest req =
+              RefreshServiceAclsRequest.newInstance();
+          RefreshServiceAclsResponse response =
+              getRouterRMAdminService().refreshServiceAcls(req);
+          return response;
         });
   }
 
   protected UpdateNodeResourceResponse updateNodeResource(String user)
       throws IOException, InterruptedException {
     return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<UpdateNodeResourceResponse>() {
-          @Override
-          public UpdateNodeResourceResponse run() throws Exception {
-            UpdateNodeResourceRequest req =
-                UpdateNodeResourceRequest.newInstance(null);
-            UpdateNodeResourceResponse response =
-                getRouterRMAdminService().updateNodeResource(req);
-            return response;
-          }
+        .doAs((PrivilegedExceptionAction<UpdateNodeResourceResponse>) () -> {
+          UpdateNodeResourceRequest req =
+              UpdateNodeResourceRequest.newInstance(null);
+          UpdateNodeResourceResponse response =
+              getRouterRMAdminService().updateNodeResource(req);
+          return response;
         });
   }
 
   protected RefreshNodesResourcesResponse refreshNodesResources(String user)
       throws IOException, InterruptedException {
     return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<RefreshNodesResourcesResponse>() {
-          @Override
-          public RefreshNodesResourcesResponse run() throws Exception {
-            RefreshNodesResourcesRequest req =
-                RefreshNodesResourcesRequest.newInstance();
-            RefreshNodesResourcesResponse response =
-                getRouterRMAdminService().refreshNodesResources(req);
-            return response;
-          }
+        .doAs((PrivilegedExceptionAction<RefreshNodesResourcesResponse>) () -> {
+          RefreshNodesResourcesRequest req =
+              RefreshNodesResourcesRequest.newInstance();
+          RefreshNodesResourcesResponse response =
+              getRouterRMAdminService().refreshNodesResources(req);
+          return response;
         });
   }
 
   protected AddToClusterNodeLabelsResponse addToClusterNodeLabels(String user)
       throws IOException, InterruptedException {
     return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<AddToClusterNodeLabelsResponse>() {
-          @Override
-          public AddToClusterNodeLabelsResponse run() throws Exception {
-            AddToClusterNodeLabelsRequest req =
-                AddToClusterNodeLabelsRequest.newInstance(null);
-            AddToClusterNodeLabelsResponse response =
-                getRouterRMAdminService().addToClusterNodeLabels(req);
-            return response;
-          }
+        .doAs((PrivilegedExceptionAction<AddToClusterNodeLabelsResponse>) () -> {
+          AddToClusterNodeLabelsRequest req =
+              AddToClusterNodeLabelsRequest.newInstance(null);
+          AddToClusterNodeLabelsResponse response =
+              getRouterRMAdminService().addToClusterNodeLabels(req);
+          return response;
         });
   }
 
   protected RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
       String user) throws IOException, InterruptedException {
     return UserGroupInformation.createRemoteUser(user).doAs(
-        new PrivilegedExceptionAction<RemoveFromClusterNodeLabelsResponse>() {
-          @Override
-          public RemoveFromClusterNodeLabelsResponse run() throws Exception {
-            RemoveFromClusterNodeLabelsRequest req =
-                RemoveFromClusterNodeLabelsRequest.newInstance(null);
-            RemoveFromClusterNodeLabelsResponse response =
-                getRouterRMAdminService().removeFromClusterNodeLabels(req);
-            return response;
-          }
+        (PrivilegedExceptionAction<RemoveFromClusterNodeLabelsResponse>) () -> {
+          RemoveFromClusterNodeLabelsRequest req =
+              RemoveFromClusterNodeLabelsRequest.newInstance(null);
+          RemoveFromClusterNodeLabelsResponse response =
+              getRouterRMAdminService().removeFromClusterNodeLabels(req);
+          return response;
         });
   }
 
   protected ReplaceLabelsOnNodeResponse replaceLabelsOnNode(String user)
       throws IOException, InterruptedException {
     return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<ReplaceLabelsOnNodeResponse>() {
-          @Override
-          public ReplaceLabelsOnNodeResponse run() throws Exception {
-            ReplaceLabelsOnNodeRequest req = ReplaceLabelsOnNodeRequest
-                .newInstance(new HashMap<NodeId, Set<String>>());
-            ReplaceLabelsOnNodeResponse response =
-                getRouterRMAdminService().replaceLabelsOnNode(req);
-            return response;
-          }
+        .doAs((PrivilegedExceptionAction<ReplaceLabelsOnNodeResponse>) () -> {
+          ReplaceLabelsOnNodeRequest req = ReplaceLabelsOnNodeRequest
+              .newInstance(new HashMap<NodeId, Set<String>>());
+          ReplaceLabelsOnNodeResponse response =
+              getRouterRMAdminService().replaceLabelsOnNode(req);
+          return response;
         });
   }
 
   protected CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
       String user) throws IOException, InterruptedException {
     return UserGroupInformation.createRemoteUser(user).doAs(
-        new PrivilegedExceptionAction<CheckForDecommissioningNodesResponse>() {
-          @Override
-          public CheckForDecommissioningNodesResponse run() throws Exception {
-            CheckForDecommissioningNodesRequest req =
-                CheckForDecommissioningNodesRequest.newInstance();
-            CheckForDecommissioningNodesResponse response =
-                getRouterRMAdminService().checkForDecommissioningNodes(req);
-            return response;
-          }
+        (PrivilegedExceptionAction<CheckForDecommissioningNodesResponse>) () -> {
+          CheckForDecommissioningNodesRequest req =
+              CheckForDecommissioningNodesRequest.newInstance();
+          CheckForDecommissioningNodesResponse response =
+              getRouterRMAdminService().checkForDecommissioningNodes(req);
+          return response;
         });
   }
 
   protected RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
       String user) throws IOException, InterruptedException {
     return UserGroupInformation.createRemoteUser(user).doAs(
-        new PrivilegedExceptionAction<RefreshClusterMaxPriorityResponse>() {
-          @Override
-          public RefreshClusterMaxPriorityResponse run() throws Exception {
-            RefreshClusterMaxPriorityRequest req =
-                RefreshClusterMaxPriorityRequest.newInstance();
-            RefreshClusterMaxPriorityResponse response =
-                getRouterRMAdminService().refreshClusterMaxPriority(req);
-            return response;
-          }
+        (PrivilegedExceptionAction<RefreshClusterMaxPriorityResponse>) () -> {
+          RefreshClusterMaxPriorityRequest req =
+              RefreshClusterMaxPriorityRequest.newInstance();
+          RefreshClusterMaxPriorityResponse response =
+              getRouterRMAdminService().refreshClusterMaxPriority(req);
+          return response;
         });
   }
 

+ 121 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java

@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.rmadmin;
+
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Extends the FederationRMAdminInterceptor and overrides methods to provide a
+ * testable implementation of FederationRMAdminInterceptor.
+ */
+public class TestFederationRMAdminInterceptor extends BaseRouterRMAdminTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestFederationRMAdminInterceptor.class);
+
+  ////////////////////////////////
+  // constant information
+  ////////////////////////////////
+  private final static String USER_NAME = "test-user";
+  private final static int NUM_SUBCLUSTER = 4;
+
+  private TestableFederationRMAdminInterceptor interceptor;
+  private FederationStateStoreFacade facade;
+  private MemoryFederationStateStore stateStore;
+  private FederationStateStoreTestUtil stateStoreUtil;
+  private List<SubClusterId> subClusters;
+
+  @Override
+  public void setUp() {
+
+    super.setUpConfig();
+
+    // Initialize facade & stateSore
+    stateStore = new MemoryFederationStateStore();
+    stateStore.init(this.getConf());
+    facade = FederationStateStoreFacade.getInstance();
+    facade.reinitialize(stateStore, getConf());
+    stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
+
+    // Initialize interceptor
+    interceptor = new TestableFederationRMAdminInterceptor();
+    interceptor.setConf(this.getConf());
+    interceptor.init(USER_NAME);
+
+    // Storage SubClusters
+    subClusters = new ArrayList<>();
+    try {
+      for (int i = 0; i < NUM_SUBCLUSTER; i++) {
+        SubClusterId sc = SubClusterId.newInstance(Integer.toString(i));
+        stateStoreUtil.registerSubCluster(sc);
+        subClusters.add(sc);
+      }
+    } catch (YarnException e) {
+      LOG.error(e.getMessage());
+      Assert.fail();
+    }
+
+    DefaultMetricsSystem.setMiniClusterMode(true);
+  }
+
+  @Override
+  protected YarnConfiguration createConfiguration() {
+    // Set Enable YarnFederation
+    YarnConfiguration config = new YarnConfiguration();
+    config.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+
+    String mockPassThroughInterceptorClass =
+        PassThroughRMAdminRequestInterceptor.class.getName();
+
+    // Create a request interceptor pipeline for testing. The last one in the
+    // chain will call the mock resource manager. The others in the chain will
+    // simply forward it to the next one in the chain
+    config.set(YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE,
+        mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + "," +
+        TestFederationRMAdminInterceptor.class.getName());
+    return config;
+  }
+
+  @Override
+  public void tearDown() {
+    interceptor.shutdown();
+    super.tearDown();
+  }
+
+  @Test
+  public void testRefreshQueues() throws IOException, YarnException {
+    RefreshQueuesRequest request = RefreshQueuesRequest.newInstance();
+    interceptor.refreshQueues(request);
+  }
+}

+ 99 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestableFederationRMAdminInterceptor.java

@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.rmadmin;
+
+import org.apache.commons.collections.MapUtils;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TestableFederationRMAdminInterceptor extends FederationRMAdminInterceptor {
+
+  // Record log information
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestableFederationRMAdminInterceptor.class);
+
+  // Used to Store the relationship between SubClusterId and RM
+  private ConcurrentHashMap<SubClusterId, MockRM> mockRMs = new ConcurrentHashMap<>();
+
+  // Store Bad subCluster
+  private Set<SubClusterId> badSubCluster = new HashSet<>();
+
+  @Override
+  protected ResourceManagerAdministrationProtocol getAdminRMProxyForSubCluster(
+      SubClusterId subClusterId) throws YarnException {
+    MockRM mockRM;
+    synchronized (this) {
+      if (mockRMs.containsKey(subClusterId)) {
+        mockRM = mockRMs.get(subClusterId);
+      } else {
+        mockRM = new MockRM();
+        if (badSubCluster.contains(subClusterId)) {
+          return new MockRMAdminBadService(mockRM);
+        }
+        mockRM.init(super.getConf());
+        mockRM.start();
+        mockRMs.put(subClusterId, mockRM);
+      }
+      return mockRM.getAdminService();
+    }
+  }
+
+  // This represents an unserviceable SubCluster
+  private class MockRMAdminBadService extends AdminService {
+    MockRMAdminBadService(ResourceManager rm) {
+      super(rm);
+    }
+
+    @Override
+    public void refreshQueues() throws IOException, YarnException {
+      throw new ConnectException("RM is stopped");
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    // if mockRMs is not null
+    if (MapUtils.isNotEmpty(mockRMs)) {
+      for (Map.Entry<SubClusterId, MockRM> item : mockRMs.entrySet()) {
+        SubClusterId subClusterId = item.getKey();
+        // close mockRM.
+        MockRM mockRM = item.getValue();
+        if (mockRM != null) {
+          LOG.info("subClusterId = {} mockRM shutdown.", subClusterId);
+          mockRM.stop();
+        }
+      }
+    }
+    mockRMs.clear();
+    super.shutdown();
+  }
+}