|
@@ -19,6 +19,7 @@
|
|
|
package org.apache.hadoop.yarn.server.router.rmadmin;
|
|
|
|
|
|
import org.apache.commons.collections.CollectionUtils;
|
|
|
+import org.apache.commons.collections.MapUtils;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.apache.hadoop.classification.VisibleForTesting;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -68,6 +69,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePoli
|
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
|
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest;
|
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse;
|
|
|
+import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesRequest;
|
|
|
+import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse;
|
|
|
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
|
|
@@ -84,6 +87,7 @@ import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.nio.ByteBuffer;
|
|
|
import java.util.List;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Map;
|
|
@@ -995,6 +999,213 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep
|
|
|
throw new YarnException("Unable to batchSaveFederationQueuePolicies.");
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * List the Queue Policies for the Federation.
|
|
|
+ *
|
|
|
+ * @param request QueryFederationQueuePolicies Request.
|
|
|
+ * @return QueryFederationQueuePolicies Response.
|
|
|
+ *
|
|
|
+ * @throws YarnException indicates exceptions from yarn servers.
|
|
|
+ * @throws IOException io error occurs.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public QueryFederationQueuePoliciesResponse listFederationQueuePolicies(
|
|
|
+ QueryFederationQueuePoliciesRequest request) throws YarnException, IOException {
|
|
|
+
|
|
|
+ // Parameter validation.
|
|
|
+ if (request == null) {
|
|
|
+ routerMetrics.incrListFederationQueuePoliciesFailedRetrieved();
|
|
|
+ RouterServerUtil.logAndThrowException(
|
|
|
+ "Missing ListFederationQueuePolicies Request.", null);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (request.getPageSize() <= 0) {
|
|
|
+ routerMetrics.incrListFederationQueuePoliciesFailedRetrieved();
|
|
|
+ RouterServerUtil.logAndThrowException(
|
|
|
+ "PageSize cannot be negative or zero.", null);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (request.getCurrentPage() <= 0) {
|
|
|
+ routerMetrics.incrListFederationQueuePoliciesFailedRetrieved();
|
|
|
+ RouterServerUtil.logAndThrowException(
|
|
|
+ "CurrentPage cannot be negative or zero.", null);
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ QueryFederationQueuePoliciesResponse response = null;
|
|
|
+
|
|
|
+ long startTime = clock.getTime();
|
|
|
+ String queue = request.getQueue();
|
|
|
+ List<String> queues = request.getQueues();
|
|
|
+ int currentPage = request.getCurrentPage();
|
|
|
+ int pageSize = request.getPageSize();
|
|
|
+
|
|
|
+ // Print log
|
|
|
+ LOG.info("queue = {}, queues={}, currentPage={}, pageSize={}",
|
|
|
+ queue, queues, currentPage, pageSize);
|
|
|
+
|
|
|
+ Map<String, SubClusterPolicyConfiguration> policiesConfigurations =
|
|
|
+ federationFacade.getPoliciesConfigurations();
|
|
|
+
|
|
|
+ // If the queue is not empty, filter according to the queue.
|
|
|
+ if (StringUtils.isNotBlank(queue)) {
|
|
|
+ response = filterPoliciesConfigurationsByQueue(queue, policiesConfigurations,
|
|
|
+ pageSize, currentPage);
|
|
|
+ } else if(CollectionUtils.isNotEmpty(queues)) {
|
|
|
+ // If queues are not empty, filter by queues, which may return multiple results.
|
|
|
+ // We filter by pagination.
|
|
|
+ response = filterPoliciesConfigurationsByQueues(queues, policiesConfigurations,
|
|
|
+ pageSize, currentPage);
|
|
|
+ }
|
|
|
+ long stopTime = clock.getTime();
|
|
|
+ routerMetrics.succeededListFederationQueuePoliciesRetrieved(stopTime - startTime);
|
|
|
+ return response;
|
|
|
+ } catch (Exception e) {
|
|
|
+ routerMetrics.incrListFederationQueuePoliciesFailedRetrieved();
|
|
|
+ RouterServerUtil.logAndThrowException(e,
|
|
|
+ "Unable to ListFederationQueuePolicies due to exception. " + e.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ routerMetrics.incrListFederationQueuePoliciesFailedRetrieved();
|
|
|
+ throw new YarnException("Unable to listFederationQueuePolicies.");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * According to the configuration information of the queue filtering queue,
|
|
|
+ * this part should only return 1 result.
|
|
|
+ *
|
|
|
+ * @param queue queueName.
|
|
|
+ * @param policiesConfigurations policy configurations.
|
|
|
+ * @param pageSize Items per page.
|
|
|
+ * @param currentPage The number of pages to be queried.
|
|
|
+ * @return federation queue policies response.
|
|
|
+ * @throws YarnException indicates exceptions from yarn servers.
|
|
|
+ *
|
|
|
+ */
|
|
|
+ private QueryFederationQueuePoliciesResponse filterPoliciesConfigurationsByQueue(String queue,
|
|
|
+ Map<String, SubClusterPolicyConfiguration> policiesConfigurations,
|
|
|
+ int pageSize, int currentPage) throws YarnException {
|
|
|
+
|
|
|
+ // Step1. Check the parameters, if the policy list is empty, return empty directly.
|
|
|
+ if (MapUtils.isEmpty(policiesConfigurations)) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ SubClusterPolicyConfiguration policyConf = policiesConfigurations.getOrDefault(queue, null);
|
|
|
+ if(policyConf == null) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Step2. Parse the parameters.
|
|
|
+ List<FederationQueueWeight> federationQueueWeights = new ArrayList<>();
|
|
|
+ FederationQueueWeight federationQueueWeight = parseFederationQueueWeight(queue, policyConf);
|
|
|
+ federationQueueWeights.add(federationQueueWeight);
|
|
|
+
|
|
|
+ // Step3. Return result.
|
|
|
+ return QueryFederationQueuePoliciesResponse.newInstance(
|
|
|
+ 1, 1, currentPage, pageSize, federationQueueWeights);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Filter queue configuration information based on the queue list.
|
|
|
+ *
|
|
|
+ * @param queues The name of the queue.
|
|
|
+ * @param policiesConfigurations policy configurations.
|
|
|
+ * @param pageSize Items per page.
|
|
|
+ * @param currentPage The number of pages to be queried.
|
|
|
+ * @return federation queue policies response.
|
|
|
+ * @throws YarnException indicates exceptions from yarn servers.
|
|
|
+ */
|
|
|
+ private QueryFederationQueuePoliciesResponse filterPoliciesConfigurationsByQueues(
|
|
|
+ List<String> queues, Map<String, SubClusterPolicyConfiguration> policiesConfigurations,
|
|
|
+ int pageSize, int currentPage) throws YarnException {
|
|
|
+
|
|
|
+ // Step1. Check the parameters, if the policy list is empty, return empty directly.
|
|
|
+ if (MapUtils.isEmpty(policiesConfigurations)) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Step2. Filtering for Queue Policies.
|
|
|
+ List<FederationQueueWeight> federationQueueWeights = new ArrayList<>();
|
|
|
+ for (String queue : queues) {
|
|
|
+ SubClusterPolicyConfiguration policyConf = policiesConfigurations.getOrDefault(queue, null);
|
|
|
+ if(policyConf == null) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ FederationQueueWeight federationQueueWeight = parseFederationQueueWeight(queue, policyConf);
|
|
|
+ if (federationQueueWeight != null) {
|
|
|
+ federationQueueWeights.add(federationQueueWeight);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ int startIndex = (currentPage - 1) * pageSize;
|
|
|
+ int endIndex = Math.min(startIndex + pageSize, federationQueueWeights.size());
|
|
|
+ List<FederationQueueWeight> subFederationQueueWeights =
|
|
|
+ federationQueueWeights.subList(startIndex, endIndex);
|
|
|
+
|
|
|
+ int totalSize = federationQueueWeights.size();
|
|
|
+ int totalPage =
|
|
|
+ (totalSize % pageSize == 0) ? totalSize / pageSize : (totalSize / pageSize) + 1;
|
|
|
+
|
|
|
+ // Step3. Returns the Queue Policies result.
|
|
|
+ return QueryFederationQueuePoliciesResponse.newInstance(
|
|
|
+ totalSize, totalPage, currentPage, pageSize, subFederationQueueWeights);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Parses a FederationQueueWeight from the given queue and SubClusterPolicyConfiguration.
|
|
|
+ *
|
|
|
+ * @param queue The name of the queue.
|
|
|
+ * @param policyConf policy configuration.
|
|
|
+ * @return Queue weights for representing Federation.
|
|
|
+ * @throws YarnException YarnException indicates exceptions from yarn servers.
|
|
|
+ */
|
|
|
+ private FederationQueueWeight parseFederationQueueWeight(String queue,
|
|
|
+ SubClusterPolicyConfiguration policyConf) throws YarnException {
|
|
|
+
|
|
|
+ if (policyConf != null) {
|
|
|
+ ByteBuffer params = policyConf.getParams();
|
|
|
+ WeightedPolicyInfo weightedPolicyInfo = WeightedPolicyInfo.fromByteBuffer(params);
|
|
|
+ Map<SubClusterIdInfo, Float> amrmPolicyWeights = weightedPolicyInfo.getAMRMPolicyWeights();
|
|
|
+ Map<SubClusterIdInfo, Float> routerPolicyWeights =
|
|
|
+ weightedPolicyInfo.getRouterPolicyWeights();
|
|
|
+ float headroomAlpha = weightedPolicyInfo.getHeadroomAlpha();
|
|
|
+ String policyManagerClassName = policyConf.getType();
|
|
|
+
|
|
|
+ String amrmPolicyWeight = parsePolicyWeights(amrmPolicyWeights);
|
|
|
+ String routerPolicyWeight = parsePolicyWeights(routerPolicyWeights);
|
|
|
+
|
|
|
+ FederationQueueWeight.checkSubClusterQueueWeightRatioValid(amrmPolicyWeight);
|
|
|
+ FederationQueueWeight.checkSubClusterQueueWeightRatioValid(routerPolicyWeight);
|
|
|
+
|
|
|
+ return FederationQueueWeight.newInstance(routerPolicyWeight, amrmPolicyWeight,
|
|
|
+ String.valueOf(headroomAlpha), queue, policyManagerClassName);
|
|
|
+ }
|
|
|
+
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Parses the policy weights from the provided policyWeights map.
|
|
|
+ * returns a string similar to the following:
|
|
|
+ * SC-1:0.7,SC-2:0.3
|
|
|
+ *
|
|
|
+ * @param policyWeights
|
|
|
+ * A map containing SubClusterIdInfo as keys and corresponding weight values.
|
|
|
+ * @return A string representation of the parsed policy weights.
|
|
|
+ */
|
|
|
+ protected String parsePolicyWeights(Map<SubClusterIdInfo, Float> policyWeights) {
|
|
|
+ if (MapUtils.isEmpty(policyWeights)) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ List<String> policyWeightList = new ArrayList<>();
|
|
|
+ for (Map.Entry<SubClusterIdInfo, Float> entry : policyWeights.entrySet()) {
|
|
|
+ SubClusterIdInfo key = entry.getKey();
|
|
|
+ Float value = entry.getValue();
|
|
|
+ policyWeightList.add(key.toId() + ":" + value);
|
|
|
+ }
|
|
|
+ return StringUtils.join(policyWeightList, ",");
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Save FederationQueuePolicy.
|
|
|
*
|