فهرست منبع

YARN-11537. [Addendum][Federation] Router CLI Supports List SubClusterPolicyConfiguration Of Queues. (#6121) Contributed by Shilun Fan.

Reviewed-by: Inigo Goiri <inigoiri@apache.org>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
slfan1989 1 سال پیش
والد
کامیت
5f47f091a2

+ 74 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java

@@ -130,7 +130,7 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep
     ThreadFactory threadFactory = new ThreadFactoryBuilder()
     ThreadFactory threadFactory = new ThreadFactoryBuilder()
         .setNameFormat("RPC Router RMAdminClient-" + userName + "-%d ").build();
         .setNameFormat("RPC Router RMAdminClient-" + userName + "-%d ").build();
 
 
-    BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
+    BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
     this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
     this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
         0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);
         0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);
 
 
@@ -1032,7 +1032,7 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep
     }
     }
 
 
     try {
     try {
-      QueryFederationQueuePoliciesResponse response = null;
+      QueryFederationQueuePoliciesResponse response;
 
 
       long startTime = clock.getTime();
       long startTime = clock.getTime();
       String queue = request.getQueue();
       String queue = request.getQueue();
@@ -1056,9 +1056,15 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep
         // We filter by pagination.
         // We filter by pagination.
         response = filterPoliciesConfigurationsByQueues(queues, policiesConfigurations,
         response = filterPoliciesConfigurationsByQueues(queues, policiesConfigurations,
             pageSize, currentPage);
             pageSize, currentPage);
+      } else {
+        // If we don't have any filtering criteria, we should also support paginating the results.
+        response = filterPoliciesConfigurations(policiesConfigurations, pageSize, currentPage);
       }
       }
       long stopTime = clock.getTime();
       long stopTime = clock.getTime();
       routerMetrics.succeededListFederationQueuePoliciesRetrieved(stopTime - startTime);
       routerMetrics.succeededListFederationQueuePoliciesRetrieved(stopTime - startTime);
+      if (response == null) {
+        response = QueryFederationQueuePoliciesResponse.newInstance();
+      }
       return response;
       return response;
     } catch (Exception e) {
     } catch (Exception e) {
       routerMetrics.incrListFederationQueuePoliciesFailedRetrieved();
       routerMetrics.incrListFederationQueuePoliciesFailedRetrieved();
@@ -1137,12 +1143,75 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep
       }
       }
     }
     }
 
 
+    // Step3. To paginate the returned results.
+    return queryFederationQueuePoliciesPagination(federationQueueWeights, pageSize, currentPage);
+  }
+
+  /**
+   * Filter PoliciesConfigurations, and we paginate Policies within this method.
+   *
+   * @param policiesConfigurations policy configurations.
+   * @param pageSize Items per page.
+   * @param currentPage The number of pages to be queried.
+   * @return federation queue policies response.
+   * @throws YarnException indicates exceptions from yarn servers.
+   */
+  private QueryFederationQueuePoliciesResponse filterPoliciesConfigurations(
+      Map<String, SubClusterPolicyConfiguration> policiesConfigurations,
+      int pageSize, int currentPage) throws YarnException {
+
+    // Step1. Check the parameters, if the policy list is empty, return empty directly.
+    if (MapUtils.isEmpty(policiesConfigurations)) {
+      return null;
+    }
+
+    // Step2. Traverse policiesConfigurations and obtain the FederationQueueWeight list.
+    List<FederationQueueWeight> federationQueueWeights = new ArrayList<>();
+    for (Map.Entry<String, SubClusterPolicyConfiguration> entry :
+        policiesConfigurations.entrySet()) {
+      String queue = entry.getKey();
+      SubClusterPolicyConfiguration policyConf = entry.getValue();
+      if (policyConf == null) {
+        continue;
+      }
+      FederationQueueWeight federationQueueWeight = parseFederationQueueWeight(queue, policyConf);
+      if (federationQueueWeight != null) {
+        federationQueueWeights.add(federationQueueWeight);
+      }
+    }
+
+    // Step3. To paginate the returned results.
+    return queryFederationQueuePoliciesPagination(federationQueueWeights, pageSize, currentPage);
+  }
+
+  /**
+   * Pagination for FederationQueuePolicies.
+   *
+   * @param queueWeights List Of FederationQueueWeight.
+   * @param pageSize Items per page.
+   * @param currentPage The number of pages to be queried.
+   * @return federation queue policies response.
+   * @throws YarnException indicates exceptions from yarn servers.
+   */
+  private QueryFederationQueuePoliciesResponse queryFederationQueuePoliciesPagination(
+      List<FederationQueueWeight> queueWeights, int pageSize, int currentPage)
+      throws YarnException {
+    if (CollectionUtils.isEmpty(queueWeights)) {
+      return null;
+    }
+
     int startIndex = (currentPage - 1) * pageSize;
     int startIndex = (currentPage - 1) * pageSize;
-    int endIndex = Math.min(startIndex + pageSize, federationQueueWeights.size());
+    int endIndex = Math.min(startIndex + pageSize, queueWeights.size());
+
+    if (startIndex > endIndex) {
+      throw new YarnException("The index of the records to be retrieved " +
+          "has exceeded the maximum index.");
+    }
+
     List<FederationQueueWeight> subFederationQueueWeights =
     List<FederationQueueWeight> subFederationQueueWeights =
-        federationQueueWeights.subList(startIndex, endIndex);
+        queueWeights.subList(startIndex, endIndex);
 
 
-    int totalSize = federationQueueWeights.size();
+    int totalSize = queueWeights.size();
     int totalPage =
     int totalPage =
         (totalSize % pageSize == 0) ? totalSize / pageSize : (totalSize / pageSize) + 1;
         (totalSize % pageSize == 0) ? totalSize / pageSize : (totalSize / pageSize) + 1;
 
 

+ 23 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java

@@ -965,5 +965,28 @@ public class TestFederationRMAdminInterceptor extends BaseRouterRMAdminTest {
     List<FederationQueueWeight> federationQueueWeights6 = response6.getFederationQueueWeights();
     List<FederationQueueWeight> federationQueueWeights6 = response6.getFederationQueueWeights();
     assertNotNull(federationQueueWeights6);
     assertNotNull(federationQueueWeights6);
     assertEquals(1, federationQueueWeights6.size());
     assertEquals(1, federationQueueWeights6.size());
+
+    // Queue7: We design such a test case, we do not set any filter conditions,
+    // but we need to get the return results
+    QueryFederationQueuePoliciesRequest request7 =
+        QueryFederationQueuePoliciesRequest.newInstance(10, 1, null, null);
+    QueryFederationQueuePoliciesResponse response7 =
+        interceptor.listFederationQueuePolicies(request7);
+    assertNotNull(response7);
+    assertEquals(1, response7.getCurrentPage());
+    assertEquals(10, response7.getPageSize());
+    assertEquals(3, response7.getTotalPage());
+    assertEquals(26, response7.getTotalSize());
+    List<FederationQueueWeight> federationQueueWeights7 = response7.getFederationQueueWeights();
+    assertNotNull(federationQueueWeights7);
+    assertEquals(10, federationQueueWeights7.size());
+
+    // Queue8: We are designing a unit test where the number of records
+    // we need to retrieve exceeds the maximum number of Policies available.
+    QueryFederationQueuePoliciesRequest request8 =
+        QueryFederationQueuePoliciesRequest.newInstance(10, 10, null, null);
+    LambdaTestUtils.intercept(YarnException.class,
+        "The index of the records to be retrieved has exceeded the maximum index.",
+        () -> interceptor.listFederationQueuePolicies(request8));
   }
   }
 }
 }