Procházet zdrojové kódy

YARN-7276. Federation Router Web Service fixes. Contributed by Inigo Goiri.

Inigo Goiri před 7 roky
rodič
revize
8be5707067

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java

@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
@@ -74,6 +75,8 @@ public class Router extends CompositeService {
    */
   public static final int SHUTDOWN_HOOK_PRIORITY = 30;
 
+  private static final String METRICS_NAME = "Router";
+
   public Router() {
     super(Router.class.getName());
   }
@@ -95,6 +98,8 @@ public class Router extends CompositeService {
     webAppAddress = WebAppUtils.getWebAppBindURL(this.conf,
         YarnConfiguration.ROUTER_BIND_HOST,
         WebAppUtils.getRouterWebAppURLWithoutScheme(this.conf));
+    // Metrics
+    DefaultMetricsSystem.initialize(METRICS_NAME);
     super.serviceInit(conf);
   }
 
@@ -118,6 +123,7 @@ public class Router extends CompositeService {
       return;
     }
     super.serviceStop();
+    DefaultMetricsSystem.shutdown();
   }
 
   protected void shutDown() {

+ 8 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java

@@ -129,7 +129,9 @@ public class DefaultRequestInterceptorREST
   public NodesInfo getNodes(String states) {
     // states will be part of additionalParam
     Map<String, String[]> additionalParam = new HashMap<String, String[]>();
-    additionalParam.put(RMWSConsts.STATES, new String[] {states});
+    if (states != null && !states.isEmpty()) {
+      additionalParam.put(RMWSConsts.STATES, new String[] {states});
+    }
     return RouterWebServiceUtil.genericForward(webAppAddress, null,
         NodesInfo.class, HTTPMethods.GET,
         RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.NODES, null,
@@ -226,9 +228,11 @@ public class DefaultRequestInterceptorREST
   public LabelsToNodesInfo getLabelsToNodes(Set<String> labels)
       throws IOException {
     // labels will be part of additionalParam
-    Map<String, String[]> additionalParam = new HashMap<String, String[]>();
-    additionalParam.put(RMWSConsts.LABELS,
-        labels.toArray(new String[labels.size()]));
+    Map<String, String[]> additionalParam = new HashMap<>();
+    if (labels != null && !labels.isEmpty()) {
+      additionalParam.put(RMWSConsts.LABELS,
+          labels.toArray(new String[labels.size()]));
+    }
     return RouterWebServiceUtil.genericForward(webAppAddress, null,
         LabelsToNodesInfo.class, HTTPMethods.GET,
         RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.LABEL_MAPPINGS, null,

+ 161 - 94
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.router.webapp;
 
 import java.io.IOException;
+import java.security.Principal;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -26,12 +27,15 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletRequestWrapper;
 import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 
@@ -48,6 +52,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
 import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
 import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@@ -121,29 +126,33 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
   @Override
   public void init(String user) {
     federationFacade = FederationStateStoreFacade.getInstance();
-    rand = new Random(System.currentTimeMillis());
+    rand = new Random();
 
     final Configuration conf = this.getConf();
 
     try {
-      policyFacade = new RouterPolicyFacade(conf, federationFacade,
-          this.federationFacade.getSubClusterResolver(), null);
+      SubClusterResolver subClusterResolver =
+          this.federationFacade.getSubClusterResolver();
+      policyFacade = new RouterPolicyFacade(
+          conf, federationFacade, subClusterResolver, null);
     } catch (FederationPolicyInitializationException e) {
-      LOG.error(e.getMessage());
+      throw new YarnRuntimeException(e);
     }
 
-    numSubmitRetries =
-        conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY,
-            YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);
+    numSubmitRetries = conf.getInt(
+        YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY,
+        YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);
 
-    interceptors = new HashMap<SubClusterId, DefaultRequestInterceptorREST>();
+    interceptors = new HashMap<>();
     routerMetrics = RouterMetrics.getMetrics();
-    threadpool = HadoopExecutors.newCachedThreadPool(new ThreadFactoryBuilder()
-        .setNameFormat("FederationInterceptorREST #%d").build());
-
-    returnPartialReport =
-        conf.getBoolean(YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED,
-            YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED);
+    threadpool = HadoopExecutors.newCachedThreadPool(
+        new ThreadFactoryBuilder()
+            .setNameFormat("FederationInterceptorREST #%d")
+            .build());
+
+    returnPartialReport = conf.getBoolean(
+        YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED,
+        YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED);
   }
 
   private SubClusterId getRandomActiveSubCluster(
@@ -156,8 +165,8 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
     }
     List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
 
-    FederationPolicyUtils.validateSubClusterAvailability(list,
-        blackListSubClusters);
+    FederationPolicyUtils.validateSubClusterAvailability(
+        list, blackListSubClusters);
 
     if (blackListSubClusters != null) {
 
@@ -176,8 +185,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
     if (interceptors.containsKey(subClusterId)) {
       return interceptors.get(subClusterId);
     } else {
-      LOG.error("The interceptor for SubCluster " + subClusterId
-          + " does not exist in the cache.");
+      LOG.error(
+          "The interceptor for SubCluster {} does not exist in the cache.",
+          subClusterId);
       return null;
     }
   }
@@ -187,9 +197,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
 
     final Configuration conf = this.getConf();
 
-    String interceptorClassName =
-        conf.get(YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS,
-            YarnConfiguration.DEFAULT_ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS);
+    String interceptorClassName = conf.get(
+        YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS,
+        YarnConfiguration.DEFAULT_ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS);
     DefaultRequestInterceptorREST interceptorInstance = null;
     try {
       Class<?> interceptorClass = conf.getClassByName(interceptorClassName);
@@ -210,7 +220,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
           e);
     }
 
-    interceptorInstance.setWebAppAddress(webAppAddress);
+    interceptorInstance.setWebAppAddress("http://" + webAppAddress);
     interceptorInstance.setSubClusterId(subClusterId);
     interceptors.put(subClusterId, interceptorInstance);
     return interceptorInstance;
@@ -272,8 +282,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
             .entity(e.getLocalizedMessage()).build();
       }
 
-      LOG.debug(
-          "getNewApplication try #" + i + " on SubCluster " + subClusterId);
+      LOG.debug("getNewApplication try #{} on SubCluster {}", i, subClusterId);
 
       DefaultRequestInterceptorREST interceptor =
           getOrCreateInterceptorForSubCluster(subClusterId,
@@ -282,11 +291,12 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
       try {
         response = interceptor.createNewApplication(hsr);
       } catch (Exception e) {
-        LOG.warn("Unable to create a new ApplicationId in SubCluster "
-            + subClusterId.getId(), e);
+        LOG.warn("Unable to create a new ApplicationId in SubCluster {}",
+            subClusterId.getId(), e);
       }
 
-      if (response != null && response.getStatus() == 200) {
+      if (response != null &&
+          response.getStatus() == HttpServletResponse.SC_OK) {
 
         long stopTime = clock.getTime();
         routerMetrics.succeededAppsCreated(stopTime - startTime);
@@ -302,7 +312,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
     String errMsg = "Fail to create a new application.";
     LOG.error(errMsg);
     routerMetrics.incrAppsFailedCreated();
-    return Response.status(Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
+    return Response
+        .status(Status.INTERNAL_SERVER_ERROR)
+        .entity(errMsg)
+        .build();
   }
 
   /**
@@ -381,7 +394,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
       routerMetrics.incrAppsFailedSubmitted();
       String errMsg = "Missing ApplicationSubmissionContextInfo or "
           + "applicationSubmissionContex information.";
-      return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
+      return Response
+          .status(Status.BAD_REQUEST)
+          .entity(errMsg)
+          .build();
     }
 
     ApplicationId applicationId = null;
@@ -389,7 +405,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
       applicationId = ApplicationId.fromString(newApp.getApplicationId());
     } catch (IllegalArgumentException e) {
       routerMetrics.incrAppsFailedSubmitted();
-      return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage())
+      return Response
+          .status(Status.BAD_REQUEST)
+          .entity(e.getLocalizedMessage())
           .build();
     }
 
@@ -405,11 +423,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
         subClusterId = policyFacade.getHomeSubcluster(context, blacklist);
       } catch (YarnException e) {
         routerMetrics.incrAppsFailedSubmitted();
-        return Response.status(Status.SERVICE_UNAVAILABLE)
-            .entity(e.getLocalizedMessage()).build();
+        return Response
+            .status(Status.SERVICE_UNAVAILABLE)
+            .entity(e.getLocalizedMessage())
+            .build();
       }
-      LOG.info("submitApplication appId" + applicationId + " try #" + i
-          + " on SubCluster " + subClusterId);
+      LOG.info("submitApplication appId {} try #{} on SubCluster {}",
+          applicationId, i, subClusterId);
 
       ApplicationHomeSubCluster appHomeSubCluster =
           ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
@@ -424,8 +444,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
           routerMetrics.incrAppsFailedSubmitted();
           String errMsg = "Unable to insert the ApplicationId " + applicationId
               + " into the FederationStateStore";
-          return Response.status(Status.SERVICE_UNAVAILABLE)
-              .entity(errMsg + " " + e.getLocalizedMessage()).build();
+          return Response
+              .status(Status.SERVICE_UNAVAILABLE)
+              .entity(errMsg + " " + e.getLocalizedMessage())
+              .build();
         }
       } else {
         try {
@@ -441,15 +463,19 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
                 federationFacade.getApplicationHomeSubCluster(applicationId);
           } catch (YarnException e1) {
             routerMetrics.incrAppsFailedSubmitted();
-            return Response.status(Status.SERVICE_UNAVAILABLE)
-                .entity(e1.getLocalizedMessage()).build();
+            return Response
+                .status(Status.SERVICE_UNAVAILABLE)
+                .entity(e1.getLocalizedMessage())
+                .build();
           }
           if (subClusterId == subClusterIdInStateStore) {
-            LOG.info("Application " + applicationId
-                + " already submitted on SubCluster " + subClusterId);
+            LOG.info("Application {} already submitted on SubCluster {}",
+                applicationId, subClusterId);
           } else {
             routerMetrics.incrAppsFailedSubmitted();
-            return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg)
+            return Response
+                .status(Status.SERVICE_UNAVAILABLE)
+                .entity(errMsg)
                 .build();
           }
         }
@@ -460,8 +486,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
         subClusterInfo = federationFacade.getSubCluster(subClusterId);
       } catch (YarnException e) {
         routerMetrics.incrAppsFailedSubmitted();
-        return Response.status(Status.SERVICE_UNAVAILABLE)
-            .entity(e.getLocalizedMessage()).build();
+        return Response
+            .status(Status.SERVICE_UNAVAILABLE)
+            .entity(e.getLocalizedMessage())
+            .build();
       }
 
       Response response = null;
@@ -470,13 +498,14 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
             subClusterInfo.getRMWebServiceAddress()).submitApplication(newApp,
                 hsr);
       } catch (Exception e) {
-        LOG.warn("Unable to submit the application " + applicationId
-            + "to SubCluster " + subClusterId.getId(), e);
+        LOG.warn("Unable to submit the application {} to SubCluster {}",
+            applicationId, subClusterId.getId(), e);
       }
 
-      if (response != null && response.getStatus() == 202) {
-        LOG.info("Application " + context.getApplicationName() + " with appId "
-            + applicationId + " submitted on " + subClusterId);
+      if (response != null &&
+          response.getStatus() == HttpServletResponse.SC_ACCEPTED) {
+        LOG.info("Application {} with appId {} submitted on {}",
+            context.getApplicationName(), applicationId, subClusterId);
 
         long stopTime = clock.getTime();
         routerMetrics.succeededAppsSubmitted(stopTime - startTime);
@@ -493,7 +522,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
     String errMsg = "Application " + newApp.getApplicationName()
         + " with appId " + applicationId + " failed to be submitted.";
     LOG.error(errMsg);
-    return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg).build();
+    return Response
+        .status(Status.SERVICE_UNAVAILABLE)
+        .entity(errMsg)
+        .build();
   }
 
   /**
@@ -541,9 +573,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
       return null;
     }
 
-    AppInfo response = getOrCreateInterceptorForSubCluster(subClusterId,
-        subClusterInfo.getRMWebServiceAddress()).getApp(hsr, appId,
-            unselectedFields);
+    DefaultRequestInterceptorREST interceptor =
+        getOrCreateInterceptorForSubCluster(
+            subClusterId, subClusterInfo.getRMWebServiceAddress());
+    AppInfo response = interceptor.getApp(hsr, appId, unselectedFields);
 
     long stopTime = clock.getTime();
     routerMetrics.succeededAppsRetrieved(stopTime - startTime);
@@ -579,7 +612,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
       applicationId = ApplicationId.fromString(appId);
     } catch (IllegalArgumentException e) {
       routerMetrics.incrAppsFailedKilled();
-      return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage())
+      return Response
+          .status(Status.BAD_REQUEST)
+          .entity(e.getLocalizedMessage())
           .build();
     }
 
@@ -591,7 +626,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
       subClusterInfo = federationFacade.getSubCluster(subClusterId);
     } catch (YarnException e) {
       routerMetrics.incrAppsFailedKilled();
-      return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage())
+      return Response
+          .status(Status.BAD_REQUEST)
+          .entity(e.getLocalizedMessage())
           .build();
     }
 
@@ -642,26 +679,28 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
     }
 
     // Send the requests in parallel
-
-    ExecutorCompletionService<AppsInfo> compSvc =
-        new ExecutorCompletionService<AppsInfo>(this.threadpool);
+    CompletionService<AppsInfo> compSvc =
+        new ExecutorCompletionService<>(this.threadpool);
 
     for (final SubClusterInfo info : subClustersActive.values()) {
+      // HttpServletRequest does not work with ExecutorCompletionService.
+      // Create a duplicate hsr.
+      final HttpServletRequest hsrCopy = clone(hsr);
       compSvc.submit(new Callable<AppsInfo>() {
         @Override
         public AppsInfo call() {
           DefaultRequestInterceptorREST interceptor =
-              getOrCreateInterceptorForSubCluster(info.getSubClusterId(),
-                  info.getClientRMServiceAddress());
-          AppsInfo rmApps = interceptor.getApps(hsr, stateQuery, statesQuery,
-              finalStatusQuery, userQuery, queueQuery, count, startedBegin,
-              startedEnd, finishBegin, finishEnd, applicationTypes,
-              applicationTags, unselectedFields);
+              getOrCreateInterceptorForSubCluster(
+                  info.getSubClusterId(), info.getRMWebServiceAddress());
+          AppsInfo rmApps = interceptor.getApps(hsrCopy, stateQuery,
+              statesQuery, finalStatusQuery, userQuery, queueQuery, count,
+              startedBegin, startedEnd, finishBegin, finishEnd,
+              applicationTypes, applicationTags, unselectedFields);
 
           if (rmApps == null) {
             routerMetrics.incrMultipleAppsFailedRetrieved();
-            LOG.error("Subcluster " + info.getSubClusterId()
-                + " failed to return appReport.");
+            LOG.error("Subcluster {} failed to return appReport.",
+                info.getSubClusterId());
             return null;
           }
           return rmApps;
@@ -670,8 +709,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
     }
 
     // Collect all the responses in parallel
-
-    for (int i = 0; i < subClustersActive.values().size(); i++) {
+    for (int i = 0; i < subClustersActive.size(); i++) {
       try {
         Future<AppsInfo> future = compSvc.take();
         AppsInfo appsResponse = future.get();
@@ -684,7 +722,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
         }
       } catch (Throwable e) {
         routerMetrics.incrMultipleAppsFailedRetrieved();
-        LOG.warn("Failed to get application report ", e);
+        LOG.warn("Failed to get application report", e);
       }
     }
 
@@ -693,9 +731,41 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
     }
 
     // Merge all the application reports got from all the available YARN RMs
+    return RouterWebServiceUtil.mergeAppsInfo(
+        apps.getApps(), returnPartialReport);
+  }
 
-    return RouterWebServiceUtil.mergeAppsInfo(apps.getApps(),
-        returnPartialReport);
+  /**
+   * Get a copy of a HTTP request. This is for thread safety.
+   * @param hsr HTTP servlet request to copy.
+   * @return Copy of the HTTP request.
+   */
+  private HttpServletRequestWrapper clone(final HttpServletRequest hsr) {
+    if (hsr == null) {
+      return null;
+    }
+    return new HttpServletRequestWrapper(hsr) {
+        public Map<String, String[]> getParameterMap() {
+          return hsr.getParameterMap();
+        }
+        public String getPathInfo() {
+          return hsr.getPathInfo();
+        }
+        public String getRemoteUser() {
+          return hsr.getRemoteUser();
+        }
+        public Principal getUserPrincipal() {
+          return hsr.getUserPrincipal();
+        }
+        public String getHeader(String value) {
+          // we override only Accept
+          if (value.equals(HttpHeaders.ACCEPT)) {
+            return RouterWebServiceUtil.getMediaTypeFromHttpServletRequest(
+                hsr, AppsInfo.class);
+          }
+          return null;
+        }
+      };
   }
 
   /**
@@ -729,8 +799,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
     }
 
     // Send the requests in parallel
-
-    ExecutorCompletionService<NodeInfo> compSvc =
+    CompletionService<NodeInfo> compSvc =
         new ExecutorCompletionService<NodeInfo>(this.threadpool);
 
     for (final SubClusterInfo info : subClustersActive.values()) {
@@ -738,14 +807,14 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
         @Override
         public NodeInfo call() {
           DefaultRequestInterceptorREST interceptor =
-              getOrCreateInterceptorForSubCluster(info.getSubClusterId(),
-                  info.getClientRMServiceAddress());
+              getOrCreateInterceptorForSubCluster(
+                  info.getSubClusterId(), info.getRMWebServiceAddress());
           try {
             NodeInfo nodeInfo = interceptor.getNode(nodeId);
             return nodeInfo;
           } catch (Exception e) {
-            LOG.error("Subcluster " + info.getSubClusterId()
-                + " failed to return nodeInfo.");
+            LOG.error("Subcluster {} failed to return nodeInfo.",
+                info.getSubClusterId());
             return null;
           }
         }
@@ -754,7 +823,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
 
     // Collect all the responses in parallel
     NodeInfo nodeInfo = null;
-    for (int i = 0; i < subClustersActive.values().size(); i++) {
+    for (int i = 0; i < subClustersActive.size(); i++) {
       try {
         Future<NodeInfo> future = compSvc.take();
         NodeInfo nodeResponse = future.get();
@@ -763,8 +832,8 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
         if (nodeResponse != null) {
           // Check if the node was already found in a different SubCluster and
           // it has an old health report
-          if (nodeInfo == null || nodeInfo.getLastHealthUpdate() < nodeResponse
-              .getLastHealthUpdate()) {
+          if (nodeInfo == null || nodeInfo.getLastHealthUpdate() <
+              nodeResponse.getLastHealthUpdate()) {
             nodeInfo = nodeResponse;
           }
         }
@@ -806,13 +875,12 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
     try {
       subClustersActive = federationFacade.getSubClusters(true);
     } catch (YarnException e) {
-      LOG.error(e.getMessage());
+      LOG.error("Cannot get nodes: {}", e.getMessage());
       return new NodesInfo();
     }
 
     // Send the requests in parallel
-
-    ExecutorCompletionService<NodesInfo> compSvc =
+    CompletionService<NodesInfo> compSvc =
         new ExecutorCompletionService<NodesInfo>(this.threadpool);
 
     for (final SubClusterInfo info : subClustersActive.values()) {
@@ -820,14 +888,14 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
         @Override
         public NodesInfo call() {
           DefaultRequestInterceptorREST interceptor =
-              getOrCreateInterceptorForSubCluster(info.getSubClusterId(),
-                  info.getClientRMServiceAddress());
+              getOrCreateInterceptorForSubCluster(
+                  info.getSubClusterId(), info.getRMWebServiceAddress());
           try {
             NodesInfo nodesInfo = interceptor.getNodes(states);
             return nodesInfo;
           } catch (Exception e) {
-            LOG.error("Subcluster " + info.getSubClusterId()
-                + " failed to return nodesInfo.");
+            LOG.error("Subcluster {} failed to return nodesInfo.",
+                info.getSubClusterId());
             return null;
           }
         }
@@ -836,7 +904,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
 
     // Collect all the responses in parallel
 
-    for (int i = 0; i < subClustersActive.values().size(); i++) {
+    for (int i = 0; i < subClustersActive.size(); i++) {
       try {
         Future<NodesInfo> future = compSvc.take();
         NodesInfo nodesResponse = future.get();
@@ -870,8 +938,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
     }
 
     // Send the requests in parallel
-
-    ExecutorCompletionService<ClusterMetricsInfo> compSvc =
+    CompletionService<ClusterMetricsInfo> compSvc =
         new ExecutorCompletionService<ClusterMetricsInfo>(this.threadpool);
 
     for (final SubClusterInfo info : subClustersActive.values()) {
@@ -879,14 +946,14 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
         @Override
         public ClusterMetricsInfo call() {
           DefaultRequestInterceptorREST interceptor =
-              getOrCreateInterceptorForSubCluster(info.getSubClusterId(),
-                  info.getClientRMServiceAddress());
+              getOrCreateInterceptorForSubCluster(
+                  info.getSubClusterId(), info.getRMWebServiceAddress());
           try {
             ClusterMetricsInfo metrics = interceptor.getClusterMetricsInfo();
             return metrics;
           } catch (Exception e) {
-            LOG.error("Subcluster " + info.getSubClusterId()
-                + " failed to return Cluster Metrics.");
+            LOG.error("Subcluster {} failed to return Cluster Metrics.",
+                info.getSubClusterId());
             return null;
           }
         }
@@ -895,7 +962,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
 
     // Collect all the responses in parallel
 
-    for (int i = 0; i < subClustersActive.values().size(); i++) {
+    for (int i = 0; i < subClustersActive.size(); i++) {
       try {
         Future<ClusterMetricsInfo> future = compSvc.take();
         ClusterMetricsInfo metricsResponse = future.get();

+ 46 - 13
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java

@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.yarn.server.router.webapp;
 
+import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT;
+import static javax.servlet.http.HttpServletResponse.SC_OK;
+
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
@@ -28,6 +31,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response;
@@ -45,6 +49,8 @@ import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
 import org.apache.hadoop.yarn.webapp.BadRequestException;
 import org.apache.hadoop.yarn.webapp.ForbiddenException;
 import org.apache.hadoop.yarn.webapp.NotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.sun.jersey.api.ConflictException;
 import com.sun.jersey.api.client.Client;
@@ -52,8 +58,6 @@ import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.WebResource;
 import com.sun.jersey.api.client.WebResource.Builder;
 import com.sun.jersey.core.util.MultivaluedMapImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * The Router webservice util class.
@@ -85,9 +89,11 @@ public final class RouterWebServiceUtil {
    *          call in case the call has no servlet request
    * @return the retrieved entity from the REST call
    */
-  protected static <T> T genericForward(String webApp, HttpServletRequest hsr,
-      final Class<T> returnType, HTTPMethods method, String targetPath,
-      Object formParam, Map<String, String[]> additionalParam) {
+  protected static <T> T genericForward(
+      final String webApp, final HttpServletRequest hsr,
+      final Class<T> returnType, final HTTPMethods method,
+      final String targetPath, final Object formParam,
+      final Map<String, String[]> additionalParam) {
 
     UserGroupInformation callerUGI = null;
 
@@ -121,14 +127,22 @@ public final class RouterWebServiceUtil {
 
           ClientResponse response = RouterWebServiceUtil.invokeRMWebService(
               webApp, targetPath, method,
-              (hsr == null) ? null : hsr.getPathInfo(), paramMap, formParam);
+              (hsr == null) ? null : hsr.getPathInfo(), paramMap, formParam,
+              getMediaTypeFromHttpServletRequest(hsr, returnType));
           if (Response.class.equals(returnType)) {
             return (T) RouterWebServiceUtil.clientResponseToResponse(response);
           }
           // YARN RM can answer with Status.OK or it throws an exception
-          if (response.getStatus() == 200) {
+          if (response.getStatus() == SC_OK) {
             return response.getEntity(returnType);
           }
+          if (response.getStatus() == SC_NO_CONTENT) {
+            try {
+              return returnType.getConstructor().newInstance();
+            } catch (RuntimeException | ReflectiveOperationException e) {
+              LOG.error("Cannot create empty entity for {}", returnType, e);
+            }
+          }
           RouterWebServiceUtil.retrieveException(response);
           return null;
         }
@@ -147,7 +161,7 @@ public final class RouterWebServiceUtil {
    */
   private static ClientResponse invokeRMWebService(String webApp, String path,
       HTTPMethods method, String additionalPath,
-      Map<String, String[]> queryParams, Object formParam) {
+      Map<String, String[]> queryParams, Object formParam, String mediaType) {
     Client client = Client.create();
 
     WebResource webResource = client.resource(webApp).path(path);
@@ -168,14 +182,12 @@ public final class RouterWebServiceUtil {
       webResource = webResource.queryParams(paramMap);
     }
 
-    // I can forward the call in JSON or XML since the Router will convert it
-    // again in Object before send it back to the client
     Builder builder = null;
     if (formParam != null) {
-      builder = webResource.entity(formParam, MediaType.APPLICATION_XML);
-      builder = builder.accept(MediaType.APPLICATION_XML);
+      builder = webResource.entity(formParam, mediaType);
+      builder = builder.accept(mediaType);
     } else {
-      builder = webResource.accept(MediaType.APPLICATION_XML);
+      builder = webResource.accept(mediaType);
     }
 
     ClientResponse response = null;
@@ -428,4 +440,25 @@ public final class RouterWebServiceUtil {
         + metricsResponse.getShutdownNodes());
   }
 
+  /**
+   * Extract from HttpServletRequest the MediaType in output.
+   */
+  protected static <T> String getMediaTypeFromHttpServletRequest(
+      HttpServletRequest request, final Class<T> returnType) {
+    if (request == null) {
+      // By default we return XML for REST call without HttpServletRequest
+      return MediaType.APPLICATION_XML;
+    }
+    // TODO
+    if (!returnType.equals(Response.class)) {
+      return MediaType.APPLICATION_XML;
+    }
+    String header = request.getHeader(HttpHeaders.ACCEPT);
+    if (header == null || header.equals("*")) {
+      // By default we return JSON
+      return MediaType.APPLICATION_JSON;
+    }
+    return header;
+  }
+
 }

+ 52 - 45
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java

@@ -158,12 +158,19 @@ public class RouterWebServices implements RMWebServiceProtocol {
   }
 
   @VisibleForTesting
-  protected RequestInterceptorChainWrapper getInterceptorChain() {
+  protected RequestInterceptorChainWrapper getInterceptorChain(
+      final HttpServletRequest hsr) {
     String user = "";
+    if (hsr != null) {
+      user = hsr.getRemoteUser();
+    }
     try {
-      user = UserGroupInformation.getCurrentUser().getUserName();
+      if (user == null || user.equals("")) {
+        // Yarn Router user
+        user = UserGroupInformation.getCurrentUser().getUserName();
+      }
     } catch (IOException e) {
-      LOG.error("IOException " + e.getMessage());
+      LOG.error("Cannot get user: {}", e.getMessage());
     }
     if (!userPipelineMap.containsKey(user)) {
       initializePipeline(user);
@@ -316,7 +323,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
   @Override
   public ClusterInfo getClusterInfo() {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
     return pipeline.getRootInterceptor().getClusterInfo();
   }
 
@@ -327,7 +334,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
   @Override
   public ClusterMetricsInfo getClusterMetricsInfo() {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
     return pipeline.getRootInterceptor().getClusterMetricsInfo();
   }
 
@@ -338,7 +345,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
   @Override
   public SchedulerTypeInfo getSchedulerInfo() {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
     return pipeline.getRootInterceptor().getSchedulerInfo();
   }
 
@@ -350,7 +357,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
   public String dumpSchedulerLogs(@FormParam(RMWSConsts.TIME) String time,
       @Context HttpServletRequest hsr) throws IOException {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().dumpSchedulerLogs(time, hsr);
   }
 
@@ -361,7 +368,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
   @Override
   public NodesInfo getNodes(@QueryParam(RMWSConsts.STATES) String states) {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
     return pipeline.getRootInterceptor().getNodes(states);
   }
 
@@ -372,7 +379,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
   @Override
   public NodeInfo getNode(@PathParam(RMWSConsts.NODEID) String nodeId) {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
     return pipeline.getRootInterceptor().getNode(nodeId);
   }
 
@@ -396,7 +403,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
       @QueryParam(RMWSConsts.APPLICATION_TAGS) Set<String> applicationTags,
       @QueryParam(RMWSConsts.DESELECTS) Set<String> unselectedFields) {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().getApps(hsr, stateQuery, statesQuery,
         finalStatusQuery, userQuery, queueQuery, count, startedBegin,
         startedEnd, finishBegin, finishEnd, applicationTypes, applicationTags,
@@ -411,7 +418,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
   public ActivitiesInfo getActivities(@Context HttpServletRequest hsr,
       @QueryParam(RMWSConsts.NODEID) String nodeId) {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().getActivities(hsr, nodeId);
   }
 
@@ -424,7 +431,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
       @QueryParam(RMWSConsts.APP_ID) String appId,
       @QueryParam(RMWSConsts.MAX_TIME) String time) {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().getAppActivities(hsr, appId, time);
   }
 
@@ -438,7 +445,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
       @QueryParam(RMWSConsts.STATES) Set<String> stateQueries,
       @QueryParam(RMWSConsts.APPLICATION_TYPES) Set<String> typeQueries) {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().getAppStatistics(hsr, stateQueries,
         typeQueries);
   }
@@ -452,7 +459,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
       @PathParam(RMWSConsts.APPID) String appId,
       @QueryParam(RMWSConsts.DESELECTS) Set<String> unselectedFields) {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().getApp(hsr, appId, unselectedFields);
   }
 
@@ -464,7 +471,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
   public AppState getAppState(@Context HttpServletRequest hsr,
       @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().getAppState(hsr, appId);
   }
 
@@ -478,7 +485,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
       @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException,
       YarnException, InterruptedException, IOException {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().updateAppState(targetState, hsr,
         appId);
   }
@@ -491,7 +498,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
   public NodeToLabelsInfo getNodeToLabels(@Context HttpServletRequest hsr)
       throws IOException {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().getNodeToLabels(hsr);
   }
 
@@ -503,7 +510,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
   public LabelsToNodesInfo getLabelsToNodes(
       @QueryParam(RMWSConsts.LABELS) Set<String> labels) throws IOException {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
     return pipeline.getRootInterceptor().getLabelsToNodes(labels);
   }
 
@@ -516,7 +523,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
       final NodeToLabelsEntryList newNodeToLabels,
       @Context HttpServletRequest hsr) throws Exception {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().replaceLabelsOnNodes(newNodeToLabels,
         hsr);
   }
@@ -531,7 +538,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
       @Context HttpServletRequest hsr,
       @PathParam(RMWSConsts.NODEID) String nodeId) throws Exception {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().replaceLabelsOnNode(newNodeLabelsName,
         hsr, nodeId);
   }
@@ -544,7 +551,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
   public NodeLabelsInfo getClusterNodeLabels(@Context HttpServletRequest hsr)
       throws IOException {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().getClusterNodeLabels(hsr);
   }
 
@@ -556,7 +563,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
   public Response addToClusterNodeLabels(NodeLabelsInfo newNodeLabels,
       @Context HttpServletRequest hsr) throws Exception {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().addToClusterNodeLabels(newNodeLabels,
         hsr);
   }
@@ -570,7 +577,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
       @QueryParam(RMWSConsts.LABELS) Set<String> oldNodeLabels,
       @Context HttpServletRequest hsr) throws Exception {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor()
         .removeFromCluserNodeLabels(oldNodeLabels, hsr);
   }
@@ -583,7 +590,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
   public NodeLabelsInfo getLabelsOnNode(@Context HttpServletRequest hsr,
       @PathParam(RMWSConsts.NODEID) String nodeId) throws IOException {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().getLabelsOnNode(hsr, nodeId);
   }
 
@@ -595,7 +602,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
   public AppPriority getAppPriority(@Context HttpServletRequest hsr,
       @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().getAppPriority(hsr, appId);
   }
 
@@ -609,7 +616,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
       @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException,
       YarnException, InterruptedException, IOException {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor()
         .updateApplicationPriority(targetPriority, hsr, appId);
   }
@@ -622,7 +629,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
   public AppQueue getAppQueue(@Context HttpServletRequest hsr,
       @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().getAppQueue(hsr, appId);
   }
 
@@ -636,7 +643,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
       @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException,
       YarnException, InterruptedException, IOException {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().updateAppQueue(targetQueue, hsr,
         appId);
   }
@@ -649,7 +656,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
   public Response createNewApplication(@Context HttpServletRequest hsr)
       throws AuthorizationException, IOException, InterruptedException {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().createNewApplication(hsr);
   }
 
@@ -662,7 +669,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
       @Context HttpServletRequest hsr)
       throws AuthorizationException, IOException, InterruptedException {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().submitApplication(newApp, hsr);
   }
 
@@ -675,7 +682,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
       @Context HttpServletRequest hsr) throws AuthorizationException,
       IOException, InterruptedException, Exception {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().postDelegationToken(tokenData, hsr);
   }
 
@@ -687,7 +694,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
   public Response postDelegationTokenExpiration(@Context HttpServletRequest hsr)
       throws AuthorizationException, IOException, Exception {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().postDelegationTokenExpiration(hsr);
   }
 
@@ -700,7 +707,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
       throws AuthorizationException, IOException, InterruptedException,
       Exception {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().cancelDelegationToken(hsr);
   }
 
@@ -712,7 +719,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
   public Response createNewReservation(@Context HttpServletRequest hsr)
       throws AuthorizationException, IOException, InterruptedException {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().createNewReservation(hsr);
   }
 
@@ -725,7 +732,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
       @Context HttpServletRequest hsr)
       throws AuthorizationException, IOException, InterruptedException {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().submitReservation(resContext, hsr);
   }
 
@@ -738,7 +745,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
       @Context HttpServletRequest hsr)
       throws AuthorizationException, IOException, InterruptedException {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().updateReservation(resContext, hsr);
   }
 
@@ -751,7 +758,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
       @Context HttpServletRequest hsr)
       throws AuthorizationException, IOException, InterruptedException {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().deleteReservation(resContext, hsr);
   }
 
@@ -768,7 +775,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
       @QueryParam(RMWSConsts.INCLUDE_RESOURCE) @DefaultValue(DEFAULT_INCLUDE_RESOURCE) boolean includeResourceAllocations,
       @Context HttpServletRequest hsr) throws Exception {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().listReservation(queue, reservationId,
         startTime, endTime, includeResourceAllocations, hsr);
   }
@@ -782,7 +789,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
       @PathParam(RMWSConsts.APPID) String appId,
       @PathParam(RMWSConsts.TYPE) String type) throws AuthorizationException {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().getAppTimeout(hsr, appId, type);
   }
 
@@ -794,7 +801,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
   public AppTimeoutsInfo getAppTimeouts(@Context HttpServletRequest hsr,
       @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().getAppTimeouts(hsr, appId);
   }
 
@@ -808,7 +815,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
       @PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException,
       YarnException, InterruptedException, IOException {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().updateApplicationTimeout(appTimeout,
         hsr, appId);
   }
@@ -821,7 +828,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
   public AppAttemptsInfo getAppAttempts(@Context HttpServletRequest hsr,
       @PathParam(RMWSConsts.APPID) String appId) {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().getAppAttempts(hsr, appId);
   }
 
@@ -834,7 +841,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
       @PathParam(RMWSConsts.APPID) String appId,
       @PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId) {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(req);
     return pipeline.getRootInterceptor().getAppAttempt(req, res, appId,
         appAttemptId);
   }
@@ -848,7 +855,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
       @PathParam(RMWSConsts.APPID) String appId,
       @PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId) {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(req);
     return pipeline.getRootInterceptor().getContainers(req, res, appId,
         appAttemptId);
   }
@@ -863,7 +870,7 @@ public class RouterWebServices implements RMWebServiceProtocol {
       @PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId,
       @PathParam(RMWSConsts.CONTAINERID) String containerId) {
     init();
-    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(req);
     return pipeline.getRootInterceptor().getContainer(req, res, appId,
         appAttemptId, containerId);
   }

+ 88 - 312
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java

@@ -20,15 +20,15 @@ package org.apache.hadoop.yarn.server.router.webapp;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
 
+import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.Response;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -128,487 +128,263 @@ public abstract class BaseRouterWebServicesTest {
 
   protected ClusterInfo get(String user)
       throws IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<ClusterInfo>() {
-          @Override
-          public ClusterInfo run() throws Exception {
-            return routerWebService.get();
-          }
-        });
+    // HSR is not used here
+    return routerWebService.get();
   }
 
   protected ClusterInfo getClusterInfo(String user)
       throws IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<ClusterInfo>() {
-          @Override
-          public ClusterInfo run() throws Exception {
-            return routerWebService.getClusterInfo();
-          }
-        });
+    // HSR is not used here
+    return routerWebService.getClusterInfo();
   }
 
   protected ClusterMetricsInfo getClusterMetricsInfo(String user)
       throws IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<ClusterMetricsInfo>() {
-          @Override
-          public ClusterMetricsInfo run() throws Exception {
-            return routerWebService.getClusterMetricsInfo();
-          }
-        });
+    // HSR is not used here
+    return routerWebService.getClusterMetricsInfo();
   }
 
   protected SchedulerTypeInfo getSchedulerInfo(String user)
       throws IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<SchedulerTypeInfo>() {
-          @Override
-          public SchedulerTypeInfo run() throws Exception {
-            return routerWebService.getSchedulerInfo();
-          }
-        });
+    // HSR is not used here
+    return routerWebService.getSchedulerInfo();
   }
 
   protected String dumpSchedulerLogs(String user)
       throws IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<String>() {
-          @Override
-          public String run() throws Exception {
-            return routerWebService.dumpSchedulerLogs(null, null);
-          }
-        });
+    return routerWebService.dumpSchedulerLogs(null,
+        createHttpServletRequest(user));
   }
 
   protected NodesInfo getNodes(String user)
       throws IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<NodesInfo>() {
-          @Override
-          public NodesInfo run() throws Exception {
-            return routerWebService.getNodes(null);
-          }
-        });
+    return routerWebService.getNodes(null);
   }
 
   protected NodeInfo getNode(String user)
       throws IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<NodeInfo>() {
-          @Override
-          public NodeInfo run() throws Exception {
-            return routerWebService.getNode(null);
-          }
-        });
+    return routerWebService.getNode(null);
   }
 
   protected AppsInfo getApps(String user)
       throws IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<AppsInfo>() {
-          @Override
-          public AppsInfo run() throws Exception {
-            return routerWebService.getApps(null, null, null, null, null, null,
-                null, null, null, null, null, null, null, null);
-          }
-        });
+    return routerWebService.getApps(createHttpServletRequest(user), null, null,
+        null, null, null, null, null, null, null, null, null, null, null);
   }
 
   protected ActivitiesInfo getActivities(String user)
       throws IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<ActivitiesInfo>() {
-          @Override
-          public ActivitiesInfo run() throws Exception {
-            return routerWebService.getActivities(null, null);
-          }
-        });
+    return routerWebService.getActivities(
+        createHttpServletRequest(user), null);
   }
 
   protected AppActivitiesInfo getAppActivities(String user)
       throws IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<AppActivitiesInfo>() {
-          @Override
-          public AppActivitiesInfo run() throws Exception {
-            return routerWebService.getAppActivities(null, null, null);
-          }
-        });
+    return routerWebService.getAppActivities(
+        createHttpServletRequest(user), null, null);
   }
 
   protected ApplicationStatisticsInfo getAppStatistics(String user)
       throws IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<ApplicationStatisticsInfo>() {
-          @Override
-          public ApplicationStatisticsInfo run() throws Exception {
-            return routerWebService.getAppStatistics(null, null, null);
-          }
-        });
+    return routerWebService.getAppStatistics(
+        createHttpServletRequest(user), null, null);
   }
 
   protected AppInfo getApp(String user)
       throws IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<AppInfo>() {
-          @Override
-          public AppInfo run() throws Exception {
-            return routerWebService.getApp(null, null, null);
-          }
-        });
+    return routerWebService.getApp(createHttpServletRequest(user), null, null);
   }
 
   protected AppState getAppState(String user)
       throws IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<AppState>() {
-          @Override
-          public AppState run() throws Exception {
-            return routerWebService.getAppState(null, null);
-          }
-        });
+    return routerWebService.getAppState(createHttpServletRequest(user), null);
   }
 
   protected Response updateAppState(String user) throws AuthorizationException,
       YarnException, InterruptedException, IOException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<Response>() {
-          @Override
-          public Response run() throws Exception {
-            return routerWebService.updateAppState(null, null, null);
-          }
-        });
+    return routerWebService.updateAppState(
+        null, createHttpServletRequest(user), null);
   }
 
   protected NodeToLabelsInfo getNodeToLabels(String user)
       throws IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<NodeToLabelsInfo>() {
-          @Override
-          public NodeToLabelsInfo run() throws Exception {
-            return routerWebService.getNodeToLabels(null);
-          }
-        });
+    return routerWebService.getNodeToLabels(createHttpServletRequest(user));
   }
 
   protected LabelsToNodesInfo getLabelsToNodes(String user)
       throws IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<LabelsToNodesInfo>() {
-          @Override
-          public LabelsToNodesInfo run() throws Exception {
-            return routerWebService.getLabelsToNodes(null);
-          }
-        });
+    return routerWebService.getLabelsToNodes(null);
   }
 
   protected Response replaceLabelsOnNodes(String user) throws Exception {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<Response>() {
-          @Override
-          public Response run() throws Exception {
-            return routerWebService.replaceLabelsOnNodes(null, null);
-          }
-        });
+    return routerWebService.replaceLabelsOnNodes(
+        null, createHttpServletRequest(user));
   }
 
   protected Response replaceLabelsOnNode(String user) throws Exception {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<Response>() {
-          @Override
-          public Response run() throws Exception {
-            return routerWebService.replaceLabelsOnNode(null, null, null);
-          }
-        });
+    return routerWebService.replaceLabelsOnNode(
+        null, createHttpServletRequest(user), null);
   }
 
   protected NodeLabelsInfo getClusterNodeLabels(String user)
       throws IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<NodeLabelsInfo>() {
-          @Override
-          public NodeLabelsInfo run() throws Exception {
-            return routerWebService.getClusterNodeLabels(null);
-          }
-        });
+    return routerWebService.getClusterNodeLabels(
+        createHttpServletRequest(user));
   }
 
   protected Response addToClusterNodeLabels(String user) throws Exception {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<Response>() {
-          @Override
-          public Response run() throws Exception {
-            return routerWebService.addToClusterNodeLabels(null, null);
-          }
-        });
+    return routerWebService.addToClusterNodeLabels(
+        null, createHttpServletRequest(user));
   }
 
   protected Response removeFromCluserNodeLabels(String user) throws Exception {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<Response>() {
-          @Override
-          public Response run() throws Exception {
-            return routerWebService.removeFromCluserNodeLabels(null, null);
-          }
-        });
+    return routerWebService.removeFromCluserNodeLabels(
+        null, createHttpServletRequest(user));
   }
 
   protected NodeLabelsInfo getLabelsOnNode(String user)
       throws IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<NodeLabelsInfo>() {
-          @Override
-          public NodeLabelsInfo run() throws Exception {
-            return routerWebService.getLabelsOnNode(null, null);
-          }
-        });
+    return routerWebService.getLabelsOnNode(
+        createHttpServletRequest(user), null);
   }
 
   protected AppPriority getAppPriority(String user)
       throws IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<AppPriority>() {
-          @Override
-          public AppPriority run() throws Exception {
-            return routerWebService.getAppPriority(null, null);
-          }
-        });
+    return routerWebService.getAppPriority(
+        createHttpServletRequest(user), null);
   }
 
   protected Response updateApplicationPriority(String user)
       throws AuthorizationException, YarnException, InterruptedException,
       IOException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<Response>() {
-          @Override
-          public Response run() throws Exception {
-            return routerWebService.updateApplicationPriority(null, null, null);
-          }
-        });
+    return routerWebService.updateApplicationPriority(
+        null, createHttpServletRequest(user), null);
   }
 
   protected AppQueue getAppQueue(String user)
       throws IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<AppQueue>() {
-          @Override
-          public AppQueue run() throws Exception {
-            return routerWebService.getAppQueue(null, null);
-          }
-        });
+    return routerWebService.getAppQueue(createHttpServletRequest(user), null);
   }
 
   protected Response updateAppQueue(String user) throws AuthorizationException,
       YarnException, InterruptedException, IOException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<Response>() {
-          @Override
-          public Response run() throws Exception {
-            return routerWebService.updateAppQueue(null, null, null);
-          }
-        });
+    return routerWebService.updateAppQueue(
+        null, createHttpServletRequest(user), null);
   }
 
   protected Response createNewApplication(String user)
       throws AuthorizationException, IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<Response>() {
-          @Override
-          public Response run() throws Exception {
-            return routerWebService.createNewApplication(null);
-          }
-        });
+    return routerWebService.createNewApplication(
+        createHttpServletRequest(user));
   }
 
   protected Response submitApplication(String user)
       throws AuthorizationException, IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<Response>() {
-          @Override
-          public Response run() throws Exception {
-            return routerWebService.submitApplication(null, null);
-          }
-        });
+    return routerWebService.submitApplication(
+        null, createHttpServletRequest(user));
   }
 
   protected Response postDelegationToken(String user)
       throws AuthorizationException, IOException, InterruptedException,
       Exception {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<Response>() {
-          @Override
-          public Response run() throws Exception {
-            return routerWebService.postDelegationToken(null, null);
-          }
-        });
+    return routerWebService.postDelegationToken(
+        null, createHttpServletRequest(user));
   }
 
   protected Response postDelegationTokenExpiration(String user)
       throws AuthorizationException, IOException, Exception {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<Response>() {
-          @Override
-          public Response run() throws Exception {
-            return routerWebService.postDelegationTokenExpiration(null);
-          }
-        });
+    return routerWebService.postDelegationTokenExpiration(
+        createHttpServletRequest(user));
   }
 
   protected Response cancelDelegationToken(String user)
       throws AuthorizationException, IOException, InterruptedException,
       Exception {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<Response>() {
-          @Override
-          public Response run() throws Exception {
-            return routerWebService.cancelDelegationToken(null);
-          }
-        });
+    return routerWebService.cancelDelegationToken(
+        createHttpServletRequest(user));
   }
 
   protected Response createNewReservation(String user)
       throws AuthorizationException, IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<Response>() {
-          @Override
-          public Response run() throws Exception {
-            return routerWebService.createNewReservation(null);
-          }
-        });
+    return routerWebService.createNewReservation(
+        createHttpServletRequest(user));
   }
 
   protected Response submitReservation(String user)
       throws AuthorizationException, IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<Response>() {
-          @Override
-          public Response run() throws Exception {
-            return routerWebService.submitReservation(null, null);
-          }
-        });
+    return routerWebService.submitReservation(
+        null, createHttpServletRequest(user));
   }
 
   protected Response updateReservation(String user)
       throws AuthorizationException, IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<Response>() {
-          @Override
-          public Response run() throws Exception {
-            return routerWebService.updateReservation(null, null);
-          }
-        });
+    return routerWebService.updateReservation(
+        null, createHttpServletRequest(user));
   }
 
   protected Response deleteReservation(String user)
       throws AuthorizationException, IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<Response>() {
-          @Override
-          public Response run() throws Exception {
-            return routerWebService.deleteReservation(null, null);
-          }
-        });
+    return routerWebService.deleteReservation(
+        null, createHttpServletRequest(user));
   }
 
   protected Response listReservation(String user) throws Exception {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<Response>() {
-          @Override
-          public Response run() throws Exception {
-            return routerWebService.listReservation(null, null, 0, 0, false,
-                null);
-          }
-        });
+    return routerWebService.listReservation(
+        null, null, 0, 0, false, createHttpServletRequest(user));
   }
 
   protected AppTimeoutInfo getAppTimeout(String user)
       throws IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<AppTimeoutInfo>() {
-          @Override
-          public AppTimeoutInfo run() throws Exception {
-            return routerWebService.getAppTimeout(null, null, null);
-          }
-        });
+    return routerWebService.getAppTimeout(
+        createHttpServletRequest(user), null, null);
   }
 
   protected AppTimeoutsInfo getAppTimeouts(String user)
       throws IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<AppTimeoutsInfo>() {
-          @Override
-          public AppTimeoutsInfo run() throws Exception {
-            return routerWebService.getAppTimeouts(null, null);
-          }
-        });
+    return routerWebService.getAppTimeouts(
+        createHttpServletRequest(user), null);
   }
 
   protected Response updateApplicationTimeout(String user)
       throws AuthorizationException, YarnException, InterruptedException,
       IOException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<Response>() {
-          @Override
-          public Response run() throws Exception {
-            return routerWebService.updateApplicationTimeout(null, null, null);
-          }
-        });
+    return routerWebService.updateApplicationTimeout(
+        null, createHttpServletRequest(user), null);
   }
 
   protected AppAttemptsInfo getAppAttempts(String user)
       throws IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<AppAttemptsInfo>() {
-          @Override
-          public AppAttemptsInfo run() throws Exception {
-            return routerWebService.getAppAttempts(null, null);
-          }
-        });
+    return routerWebService.getAppAttempts(
+        createHttpServletRequest(user), null);
   }
 
   protected AppAttemptInfo getAppAttempt(String user)
       throws IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<AppAttemptInfo>() {
-          @Override
-          public AppAttemptInfo run() throws Exception {
-            return routerWebService.getAppAttempt(null, null, null, null);
-          }
-        });
+    return routerWebService.getAppAttempt(
+        createHttpServletRequest(user), null, null, null);
   }
 
   protected ContainersInfo getContainers(String user)
       throws IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<ContainersInfo>() {
-          @Override
-          public ContainersInfo run() throws Exception {
-            return routerWebService.getContainers(null, null, null, null);
-          }
-        });
+    return routerWebService.getContainers(
+        createHttpServletRequest(user), null, null, null);
   }
 
   protected ContainerInfo getContainer(String user)
       throws IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<ContainerInfo>() {
-          @Override
-          public ContainerInfo run() throws Exception {
-            return routerWebService.getContainer(null, null, null, null, null);
-          }
-        });
+    return routerWebService.getContainer(
+        createHttpServletRequest(user), null, null, null, null);
   }
 
   protected RequestInterceptorChainWrapper getInterceptorChain(String user)
       throws IOException, InterruptedException {
-    return UserGroupInformation.createRemoteUser(user)
-        .doAs(new PrivilegedExceptionAction<RequestInterceptorChainWrapper>() {
-          @Override
-          public RequestInterceptorChainWrapper run() throws Exception {
-            return routerWebService.getInterceptorChain();
-          }
-        });
+    HttpServletRequest request = createHttpServletRequest(user);
+    return routerWebService.getInterceptorChain(request);
   }
 
+  private HttpServletRequest createHttpServletRequest(String user) {
+    HttpServletRequest request = mock(HttpServletRequest.class);
+    when(request.getRemoteUser()).thenReturn(user);
+    return request;
+  }
 }

+ 13 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/JavaProcess.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.router.webapp;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.List;
 
 /**
  * Helper class to start a new process.
@@ -28,13 +29,23 @@ public class JavaProcess {
 
   private Process process = null;
 
-  public JavaProcess(Class<?> klass) throws IOException, InterruptedException {
+  public JavaProcess(Class<?> clazz) throws IOException, InterruptedException {
+    this(clazz, null);
+  }
+
+  public JavaProcess(Class<?> clazz, List<String> addClasspaths)
+      throws IOException, InterruptedException {
     String javaHome = System.getProperty("java.home");
     String javaBin =
         javaHome + File.separator + "bin" + File.separator + "java";
     String classpath = System.getProperty("java.class.path");
     classpath = classpath.concat("./src/test/resources");
-    String className = klass.getCanonicalName();
+    if (addClasspaths != null) {
+      for (String addClasspath : addClasspaths) {
+        classpath = classpath.concat(File.pathSeparatorChar + addClasspath);
+      }
+    }
+    String className = clazz.getCanonicalName();
     ProcessBuilder builder =
         new ProcessBuilder(javaBin, "-cp", classpath, className);
     builder.inheritIO();

Rozdílová data souboru nebyla zobrazena, protože soubor je příliš velký
+ 608 - 503
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServicesREST.java


Některé soubory nejsou zobrazeny, neboť je v těchto rozdílových datech změněno mnoho souborů