Browse Source

YARN-11442. Refactor FederationInterceptorREST Code. (#5420)

slfan1989 2 years ago
parent
commit
eb1d3ebe2f

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java

@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.Random;
+import java.util.Collection;
 
 import javax.cache.Cache;
 import javax.cache.CacheManager;
@@ -93,6 +94,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1211,4 +1213,23 @@ public final class FederationStateStoreFacade {
     }
     return false;
   }
+
+  /**
+   * Get active subclusters.
+   *
+   * @return We will return a list of active subclusters as a Collection.
+   */
+  public Collection<SubClusterInfo> getActiveSubClusters()
+      throws NotFoundException {
+    try {
+      Map<SubClusterId, SubClusterInfo> subClusterMap = getSubClusters(true);
+      if (MapUtils.isEmpty(subClusterMap)) {
+        throw new NotFoundException("Not Found SubClusters.");
+      }
+      return subClusterMap.values();
+    } catch (Exception e) {
+      LOG.error("getActiveSubClusters failed.", e);
+      return null;
+    }
+  }
 }

File diff suppressed because it is too large
+ 205 - 321
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java


+ 6 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java

@@ -111,8 +111,8 @@ public final class RouterWebServiceUtil {
    * @param formParam the form parameters as input for a specific REST call
    * @param additionalParam the query parameters as input for a specific REST
    *          call in case the call has no servlet request
+   * @param conf configuration.
    * @param client same client used to reduce number of clients created
-   * @param conf configuration
    * @return the retrieved entity from the REST call
    */
   protected static <T> T genericForward(final String webApp,
@@ -510,6 +510,11 @@ public final class RouterWebServiceUtil {
 
   /**
    * Extract from HttpServletRequest the MediaType in output.
+   *
+   * @param request the servlet request.
+   * @param returnType the return type of the REST call.
+   * @param <T> Generic Type T.
+   * @return MediaType.
    */
   protected static <T> String getMediaTypeFromHttpServletRequest(
       HttpServletRequest request, final Class<T> returnType) {

+ 39 - 60
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java

@@ -145,8 +145,6 @@ import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 import org.junit.Assert;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT;
 import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY;
@@ -170,11 +168,11 @@ import static org.mockito.Mockito.when;
  * reused to validate different request interceptor chains.
  */
 public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(TestFederationInterceptorREST.class);
+
   private final static int NUM_SUBCLUSTER = 4;
   private static final int BAD_REQUEST = 400;
   private static final int ACCEPTED = 202;
+  private static final String TEST_USER = "test-user";
   private static final int OK = 200;
   private static String user = "test-user";
   private TestableFederationInterceptorREST interceptor;
@@ -195,7 +193,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
     stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
 
     interceptor.setConf(this.getConf());
-    interceptor.init(user);
+    interceptor.init(TEST_USER);
 
     subClusters = new ArrayList<>();
 
@@ -282,8 +280,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
    * ApplicationId has to belong to one of the SubCluster in the cluster.
    */
   @Test
-  public void testGetNewApplication()
-      throws YarnException, IOException, InterruptedException {
+  public void testGetNewApplication() throws IOException, InterruptedException {
 
     Response response = interceptor.createNewApplication(null);
 
@@ -359,8 +356,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
    * request.
    */
   @Test
-  public void testSubmitApplicationEmptyRequest()
-      throws YarnException, IOException, InterruptedException {
+  public void testSubmitApplicationEmptyRequest() throws IOException, InterruptedException {
 
     // ApplicationSubmissionContextInfo null
     Response response = interceptor.submitApplication(null, null);
@@ -384,8 +380,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
    * application in wrong format.
    */
   @Test
-  public void testSubmitApplicationWrongFormat()
-      throws YarnException, IOException, InterruptedException {
+  public void testSubmitApplicationWrongFormat() throws IOException, InterruptedException {
 
     ApplicationSubmissionContextInfo context =
         new ApplicationSubmissionContextInfo();
@@ -506,8 +501,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
    * application does not exist in StateStore.
    */
   @Test
-  public void testGetApplicationNotExists()
-      throws YarnException, IOException, InterruptedException {
+  public void testGetApplicationNotExists() {
 
     ApplicationId appId =
         ApplicationId.newInstance(System.currentTimeMillis(), 1);
@@ -522,8 +516,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
    * application in wrong format.
    */
   @Test
-  public void testGetApplicationWrongFormat()
-      throws YarnException, IOException, InterruptedException {
+  public void testGetApplicationWrongFormat() {
 
     AppInfo response = interceptor.getApp(null, "Application_wrong_id", null);
 
@@ -535,8 +528,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
    * subcluster provided one application.
    */
   @Test
-  public void testGetApplicationsReport()
-      throws YarnException, IOException, InterruptedException {
+  public void testGetApplicationsReport() {
 
     AppsInfo responseGet = interceptor.getApps(null, null, null, null, null,
         null, null, null, null, null, null, null, null, null, null);
@@ -645,8 +637,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
    * application does not exist in StateStore.
    */
   @Test
-  public void testGetApplicationStateNotExists()
-      throws YarnException, IOException, InterruptedException {
+  public void testGetApplicationStateNotExists() throws IOException {
 
     ApplicationId appId =
         ApplicationId.newInstance(Time.now(), 1);
@@ -662,7 +653,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
    */
   @Test
   public void testGetApplicationStateWrongFormat()
-      throws YarnException, IOException, InterruptedException {
+      throws IOException {
 
     AppState response = interceptor.getAppState(null, "Application_wrong_id");
 
@@ -865,8 +856,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
   }
 
   @Test
-  public void testGetAppAttempts()
-      throws IOException, InterruptedException, YarnException {
+  public void testGetAppAttempts() throws IOException, InterruptedException {
     // Submit application to multiSubCluster
     ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
     ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
@@ -897,8 +887,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
   }
 
   @Test
-  public void testGetAppAttempt()
-      throws IOException, InterruptedException, YarnException {
+  public void testGetAppAttempt() throws IOException, InterruptedException {
 
     // Generate ApplicationId information
     ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
@@ -922,7 +911,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
   }
 
   @Test
-  public void testGetAppTimeout() throws IOException, InterruptedException, YarnException {
+  public void testGetAppTimeout() throws IOException, InterruptedException {
 
     // Generate ApplicationId information
     ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
@@ -942,7 +931,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
   }
 
   @Test
-  public void testGetAppTimeouts() throws IOException, InterruptedException, YarnException {
+  public void testGetAppTimeouts() throws IOException, InterruptedException {
 
     // Generate ApplicationId information
     ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
@@ -1022,8 +1011,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
   }
 
   @Test
-  public void testGetAppPriority() throws IOException, InterruptedException,
-      YarnException {
+  public void testGetAppPriority() throws IOException, InterruptedException {
 
     // Submit application to multiSubCluster
     ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
@@ -1072,7 +1060,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
   }
 
   @Test
-  public void testGetAppQueue() throws IOException, InterruptedException, YarnException {
+  public void testGetAppQueue() throws IOException, InterruptedException {
     String queueName = "queueName";
 
     // Submit application to multiSubCluster
@@ -1090,7 +1078,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
   }
 
   @Test
-  public void testGetAppsInfoCache() throws IOException, InterruptedException, YarnException {
+  public void testGetAppsInfoCache() {
 
     AppsInfo responseGet = interceptor.getApps(
         null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
@@ -1102,7 +1090,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
     LRUCacheHashMap<RouterAppInfoCacheKey, AppsInfo> appsInfoCache =
         interceptor.getAppInfosCaches();
     Assert.assertNotNull(appsInfoCache);
-    Assert.assertTrue(!appsInfoCache.isEmpty());
+    Assert.assertFalse(appsInfoCache.isEmpty());
     Assert.assertEquals(1, appsInfoCache.size());
     Assert.assertTrue(appsInfoCache.containsKey(cacheKey));
 
@@ -1113,7 +1101,6 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
 
   @Test
   public void testGetAppStatistics() throws IOException, InterruptedException, YarnException {
-    AppState appStateRUNNING = new AppState(YarnApplicationState.RUNNING.name());
 
     // Submit application to multiSubCluster
     ApplicationId appId = ApplicationId.newInstance(Time.now(), 1);
@@ -1200,6 +1187,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
     Assert.assertNotNull(entity);
     Assert.assertNotNull(entity instanceof ReservationListInfo);
 
+    Assert.assertTrue(entity instanceof ReservationListInfo);
     ReservationListInfo listInfo = (ReservationListInfo) entity;
     Assert.assertNotNull(listInfo);
 
@@ -1267,6 +1255,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
     Assert.assertNotNull(entity);
     Assert.assertNotNull(entity instanceof ReservationListInfo);
 
+    Assert.assertTrue(entity instanceof ReservationListInfo);
     ReservationListInfo listInfo = (ReservationListInfo) entity;
     Assert.assertNotNull(listInfo);
 
@@ -1310,6 +1299,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
     Assert.assertNotNull(entity);
     Assert.assertNotNull(entity instanceof ReservationListInfo);
 
+    Assert.assertTrue(entity instanceof ReservationListInfo);
     ReservationListInfo listInfo = (ReservationListInfo) entity;
     Assert.assertNotNull(listInfo);
 
@@ -1373,8 +1363,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
        throws IOException, InterruptedException {
     ReservationSubmissionRequestInfo resSubmissionRequestInfo =
         getReservationSubmissionRequestInfo(reservationId);
-    Response response = interceptor.submitReservation(resSubmissionRequestInfo, null);
-    return response;
+    return interceptor.submitReservation(resSubmissionRequestInfo, null);
   }
 
   public static ReservationSubmissionRequestInfo getReservationSubmissionRequestInfo(
@@ -1402,15 +1391,13 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
     long arrival = Time.now();
 
     // deadline by when the resource(s) must be allocated.
-    // The reason for choosing 1.05 is because this gives an integer
+    // The reason for choosing 1.05 is that this gives an integer
     // DURATION * 0.05 = 3000(ms)
     // deadline = arrival + 3000ms
     long deadline = (long) (arrival + 1.05 * DURATION);
 
-    ReservationSubmissionRequest submissionRequest = createSimpleReservationRequest(
+    return createSimpleReservationRequest(
         reservationId, numContainers, arrival, deadline, DURATION, memory, vcore);
-
-    return submissionRequest;
   }
 
   public static ReservationSubmissionRequest createSimpleReservationRequest(
@@ -1423,9 +1410,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
         Collections.singletonList(r), ReservationRequestInterpreter.R_ALL);
     ReservationDefinition rDef = ReservationDefinition.newInstance(
         arrival, deadline, reqs, "testClientRMService#reservation", "0", Priority.UNDEFINED);
-    ReservationSubmissionRequest request = ReservationSubmissionRequest.newInstance(
-        rDef, QUEUE_DEDICATED_FULL, reservationId);
-    return request;
+    return ReservationSubmissionRequest.newInstance(rDef, QUEUE_DEDICATED_FULL, reservationId);
   }
 
   @Test
@@ -1497,7 +1482,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
         interceptor.checkUserAccessToQueue(queue, userName, queueACL.name(), mockHsr);
     Assert.assertNotNull(aclInfo);
     Assert.assertTrue(aclInfo instanceof FederationRMQueueAclInfo);
-    FederationRMQueueAclInfo fedAclInfo = FederationRMQueueAclInfo.class.cast(aclInfo);
+    FederationRMQueueAclInfo fedAclInfo = (FederationRMQueueAclInfo) aclInfo;
     List<RMQueueAclInfo> aclInfos = fedAclInfo.getList();
     Assert.assertNotNull(aclInfos);
     Assert.assertEquals(4, aclInfos.size());
@@ -1513,7 +1498,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
         interceptor.checkUserAccessToQueue(queue, userName, queueACL.name(), mockHsr);
     Assert.assertNotNull(aclInfo);
     Assert.assertTrue(aclInfo instanceof FederationRMQueueAclInfo);
-    FederationRMQueueAclInfo fedAclInfo = FederationRMQueueAclInfo.class.cast(aclInfo);
+    FederationRMQueueAclInfo fedAclInfo = (FederationRMQueueAclInfo) aclInfo;
     List<RMQueueAclInfo> aclInfos = fedAclInfo.getList();
     Assert.assertNotNull(aclInfos);
     Assert.assertEquals(4, aclInfos.size());
@@ -1589,13 +1574,12 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
     Assert.assertTrue(typeInfo instanceof FederationSchedulerTypeInfo);
 
     FederationSchedulerTypeInfo federationSchedulerTypeInfo =
-        FederationSchedulerTypeInfo.class.cast(typeInfo);
+        (FederationSchedulerTypeInfo) typeInfo;
     Assert.assertNotNull(federationSchedulerTypeInfo);
     List<SchedulerTypeInfo> schedulerTypeInfos = federationSchedulerTypeInfo.getList();
     Assert.assertNotNull(schedulerTypeInfos);
     Assert.assertEquals(4, schedulerTypeInfos.size());
-    List<String> subClusterIds =
-        subClusters.stream().map(subClusterId -> subClusterId.getId()).
+    List<String> subClusterIds = subClusters.stream().map(SubClusterId::getId).
         collect(Collectors.toList());
 
     for (SchedulerTypeInfo schedulerTypeInfo : schedulerTypeInfos) {
@@ -1609,8 +1593,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
       SchedulerInfo schedulerInfo = schedulerTypeInfo.getSchedulerInfo();
       Assert.assertNotNull(schedulerInfo);
       Assert.assertTrue(schedulerInfo instanceof CapacitySchedulerInfo);
-      CapacitySchedulerInfo capacitySchedulerInfo =
-          CapacitySchedulerInfo.class.cast(schedulerInfo);
+      CapacitySchedulerInfo capacitySchedulerInfo = (CapacitySchedulerInfo) schedulerInfo;
       Assert.assertNotNull(capacitySchedulerInfo);
 
       // 3. The parent queue name should be root
@@ -1702,7 +1685,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
     Assert.assertNotNull(entity);
     Assert.assertTrue(entity instanceof DelegationToken);
 
-    DelegationToken dtoken = DelegationToken.class.cast(entity);
+    DelegationToken dtoken = (DelegationToken) entity;
     Assert.assertEquals(TEST_RENEWER, dtoken.getRenewer());
     Assert.assertEquals(TEST_RENEWER, dtoken.getOwner());
     Assert.assertEquals("RM_DELEGATION_TOKEN", dtoken.getKind());
@@ -1751,7 +1734,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
     Object entity = response.getEntity();
     Assert.assertNotNull(entity);
     Assert.assertTrue(entity instanceof DelegationToken);
-    DelegationToken dtoken = DelegationToken.class.cast(entity);
+    DelegationToken dtoken = (DelegationToken) entity;
 
     final String yarnTokenHeader = "Hadoop-YARN-RM-Delegation-Token";
     when(request.getHeader(yarnTokenHeader)).thenReturn(dtoken.getToken());
@@ -1764,7 +1747,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
     Assert.assertTrue(renewEntity instanceof DelegationToken);
 
     // renewDelegation, we only return renewDate, other values are NULL.
-    DelegationToken renewDToken = DelegationToken.class.cast(renewEntity);
+    DelegationToken renewDToken = (DelegationToken) renewEntity;
     Assert.assertNull(renewDToken.getRenewer());
     Assert.assertNull(renewDToken.getOwner());
     Assert.assertNull(renewDToken.getKind());
@@ -1789,7 +1772,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
     Object entity = response.getEntity();
     Assert.assertNotNull(entity);
     Assert.assertTrue(entity instanceof DelegationToken);
-    DelegationToken dtoken = DelegationToken.class.cast(entity);
+    DelegationToken dtoken = (DelegationToken) entity;
 
     final String yarnTokenHeader = "Hadoop-YARN-RM-Delegation-Token";
     when(request.getHeader(yarnTokenHeader)).thenReturn(dtoken.getToken());
@@ -1903,7 +1886,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
     // We cannot guarantee the calling order of the sub-clusters,
     // We guarantee that the returned result contains the information of each subCluster.
     Assert.assertNotNull(dumpSchedulerLogsMsg);
-    subClusters.stream().forEach(subClusterId -> {
+    subClusters.forEach(subClusterId -> {
       String subClusterMsg =
           "subClusterId" + subClusterId + " : Capacity scheduler logs are being created.; ";
       Assert.assertTrue(dumpSchedulerLogsMsg.contains(subClusterMsg));
@@ -1978,7 +1961,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
     Assert.assertTrue(bulkActivitiesInfo instanceof FederationBulkActivitiesInfo);
 
     FederationBulkActivitiesInfo federationBulkActivitiesInfo =
-        FederationBulkActivitiesInfo.class.cast(bulkActivitiesInfo);
+        (FederationBulkActivitiesInfo) bulkActivitiesInfo;
     Assert.assertNotNull(federationBulkActivitiesInfo);
 
     List<BulkActivitiesInfo> activitiesInfos = federationBulkActivitiesInfo.getList();
@@ -2033,9 +2016,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
     // we confirm the result by contains
     String expectedMsg =
         "SubCluster-0:SUCCESS,SubCluster-1:SUCCESS,SubCluster-2:SUCCESS,SubCluster-3:SUCCESS";
-    Arrays.stream(entities).forEach(item -> {
-      Assert.assertTrue(expectedMsg.contains(item));
-    });
+    Arrays.stream(entities).forEach(item -> Assert.assertTrue(expectedMsg.contains(item)));
   }
 
   @Test
@@ -2098,9 +2079,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
     // we confirm the result by contains
     String expectedMsg =
         "SubCluster-0:SUCCESS,SubCluster-1:SUCCESS,SubCluster-2:SUCCESS,SubCluster-3:SUCCESS";
-    Arrays.stream(entities).forEach(item -> {
-      Assert.assertTrue(expectedMsg.contains(item));
-    });
+    Arrays.stream(entities).forEach(item -> Assert.assertTrue(expectedMsg.contains(item)));
   }
 
   @Test

Some files were not shown because too many files changed in this diff