浏览代码

YARN-11536. [Federation] Router CLI Supports Batch Save the SubClusterPolicyConfiguration Of Queues. (#5862) Contributed by Shilun Fan.

Reviewed-by: Inigo Goiri <inigoiri@apache.org>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
slfan1989 1 年之前
父节点
当前提交
df0381f236
共有 26 个文件被更改,包括 1330 次插入13 次删除
  1. 15 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java
  2. 53 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/BatchSaveFederationQueuePoliciesRequest.java
  3. 47 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/BatchSaveFederationQueuePoliciesResponse.java
  4. 41 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/FederationQueueWeight.java
  5. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto
  6. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
  7. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
  8. 242 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RouterCLI.java
  9. 58 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/util/MemoryPageUtils.java
  10. 59 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestMemoryPageUtils.java
  11. 23 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRouterCLI.java
  12. 74 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/federation-weights.xml
  13. 19 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java
  14. 23 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java
  15. 149 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/BatchSaveFederationQueuePoliciesRequestPBImpl.java
  16. 99 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/BatchSaveFederationQueuePoliciesResponsePBImpl.java
  17. 40 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/FederationQueueWeightPBImpl.java
  18. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
  19. 33 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
  20. 33 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
  21. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java
  22. 98 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java
  23. 9 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java
  24. 36 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
  25. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java
  26. 144 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java

+ 15 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java

@@ -60,7 +60,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterReq
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
-
+import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse;
 
 @Private
 public interface ResourceManagerAdministrationProtocol extends GetUserMappingsProtocol {
@@ -189,4 +190,17 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr
   @Idempotent
   SaveFederationQueuePolicyResponse saveFederationQueuePolicy(
       SaveFederationQueuePolicyRequest request) throws YarnException, IOException;
+
+  /**
+   * In YARN-Federation mode, this method provides a way to save queue policies in batches.
+   *
+   * @param request BatchSaveFederationQueuePolicies Request
+   * @return Response from batchSaveFederationQueuePolicies.
+   * @throws YarnException exceptions from yarn servers.
+   * @throws IOException if an IO error occurred.
+   */
+  @Private
+  @Idempotent
+  BatchSaveFederationQueuePoliciesResponse batchSaveFederationQueuePolicies(
+      BatchSaveFederationQueuePoliciesRequest request) throws YarnException, IOException;
 }

+ 53 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/BatchSaveFederationQueuePoliciesRequest.java

@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.util.List;
+
+/**
+ * In Federation mode,
+ * we will support batch save queues policies to FederationStateStore.
+ */
+@Private
+@Unstable
+public abstract class BatchSaveFederationQueuePoliciesRequest {
+
+  @Private
+  @Unstable
+  public static BatchSaveFederationQueuePoliciesRequest newInstance(
+      List<FederationQueueWeight> federationQueueWeights) {
+    BatchSaveFederationQueuePoliciesRequest request =
+        Records.newRecord(BatchSaveFederationQueuePoliciesRequest.class);
+    request.setFederationQueueWeights(federationQueueWeights);
+    return request;
+  }
+
+  @Public
+  @Unstable
+  public abstract List<FederationQueueWeight> getFederationQueueWeights();
+
+  @Private
+  @Unstable
+  public abstract void setFederationQueueWeights(
+      List<FederationQueueWeight> federationQueueWeights);
+}

+ 47 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/BatchSaveFederationQueuePoliciesResponse.java

@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.util.Records;
+
+@Private
+@Unstable
+public abstract class BatchSaveFederationQueuePoliciesResponse {
+
+  public static BatchSaveFederationQueuePoliciesResponse newInstance() {
+    return Records.newRecord(BatchSaveFederationQueuePoliciesResponse.class);
+  }
+
+  public static BatchSaveFederationQueuePoliciesResponse newInstance(String msg) {
+    BatchSaveFederationQueuePoliciesResponse response =
+        Records.newRecord(BatchSaveFederationQueuePoliciesResponse.class);
+    response.setMessage(msg);
+    return response;
+  }
+
+  @Public
+  @Unstable
+  public abstract String getMessage();
+
+  @Public
+  @Unstable
+  public abstract void setMessage(String msg);
+}

+ 41 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/FederationQueueWeight.java

@@ -75,6 +75,19 @@ public abstract class FederationQueueWeight {
     return federationQueueWeight;
   }
 
+  @Private
+  @Unstable
+  public static FederationQueueWeight newInstance(String routerWeight,
+      String amrmWeight, String headRoomAlpha, String queue, String policyManagerClassName) {
+    FederationQueueWeight federationQueueWeight = Records.newRecord(FederationQueueWeight.class);
+    federationQueueWeight.setRouterWeight(routerWeight);
+    federationQueueWeight.setAmrmWeight(amrmWeight);
+    federationQueueWeight.setHeadRoomAlpha(headRoomAlpha);
+    federationQueueWeight.setQueue(queue);
+    federationQueueWeight.setPolicyManagerClassName(policyManagerClassName);
+    return federationQueueWeight;
+  }
+
   @Public
   @Unstable
   public abstract String getRouterWeight();
@@ -166,4 +179,32 @@ public abstract class FederationQueueWeight {
   protected static boolean isNumeric(String value) {
     return NumberUtils.isCreatable(value);
   }
+
+  @Public
+  @Unstable
+  public abstract String getQueue();
+
+  @Public
+  @Unstable
+  public abstract void setQueue(String queue);
+
+  @Public
+  @Unstable
+  public abstract String getPolicyManagerClassName();
+
+  @Public
+  @Unstable
+  public abstract void setPolicyManagerClassName(String policyManagerClassName);
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append("FederationQueueWeight { ");
+    builder.append("Queue: ").append(getQueue()).append(", ");
+    builder.append("RouterWeight: ").append(getRouterWeight()).append(", ");
+    builder.append("AmrmWeight: ").append(getAmrmWeight()).append(", ");
+    builder.append("PolicyManagerClassName: ").append(getPolicyManagerClassName());
+    builder.append(" }");
+    return builder.toString();
+  }
 }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto

@@ -49,4 +49,5 @@ service ResourceManagerAdministrationProtocolService {
   rpc mapAttributesToNodes(NodesToAttributesMappingRequestProto) returns (NodesToAttributesMappingResponseProto);
   rpc deregisterSubCluster(DeregisterSubClusterRequestProto) returns (DeregisterSubClusterResponseProto);
   rpc saveFederationQueuePolicy(SaveFederationQueuePolicyRequestProto) returns (SaveFederationQueuePolicyResponseProto);
+  rpc batchSaveFederationQueuePolicies(BatchSaveFederationQueuePoliciesRequestProto) returns (BatchSaveFederationQueuePoliciesResponseProto);
 }

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto

@@ -180,6 +180,14 @@ message SaveFederationQueuePolicyResponseProto {
   required string message = 1;
 }
 
+message BatchSaveFederationQueuePoliciesRequestProto {
+  repeated FederationQueueWeightProto federationQueueWeights = 1;
+}
+
+message BatchSaveFederationQueuePoliciesResponseProto {
+  required string message = 1;
+}
+
 //////////////////////////////////////////////////////////////////
 ///////////// RM Failover related records ////////////////////////
 //////////////////////////////////////////////////////////////////

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto

@@ -444,6 +444,8 @@ message FederationQueueWeightProto {
   optional string routerWeight = 1;
   optional string amrmWeight = 2;
   optional string headRoomAlpha = 3;
+  optional string queue = 4;
+  optional string policyManagerClassName = 5;
 }
 
 ////////////////////////////////////////////////////////////////////////

+ 242 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RouterCLI.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.client.util.FormattingCLIUtils;
+import org.apache.hadoop.yarn.client.util.MemoryPageUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
@@ -40,16 +41,28 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRes
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusters;
 import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.SAXException;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import java.io.File;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -71,7 +84,8 @@ public class RouterCLI extends Configured implements Tool {
         "set the state of the subCluster to SC_LOST."))
          // Command2: policy
         .put("-policy", new UsageInfo(
-        "[-s|--save [queue;router weight;amrm weight;headroomalpha]]",
+        "[-s|--save [queue;router weight;amrm weight;headroomalpha]] " +
+         "[-bs|--batch-save [--format xml] [-f|--input-file fileName]]",
         "We provide a set of commands for Policy:" +
         " Include list policies, save policies, batch save policies. " +
         " (Note: The policy type will be directly read from the" +
@@ -102,8 +116,23 @@ public class RouterCLI extends Configured implements Tool {
   // Command2: policy
   // save policy
   private static final String OPTION_S = "s";
+  private static final String OPTION_BATCH_S = "bs";
   private static final String OPTION_SAVE = "save";
+  private static final String OPTION_BATCH_SAVE = "batch-save";
+  private static final String OPTION_FORMAT = "format";
+  private static final String OPTION_FILE = "f";
+  private static final String OPTION_INPUT_FILE = "input-file";
+
   private static final String CMD_POLICY = "-policy";
+  private static final String FORMAT_XML = "xml";
+  private static final String FORMAT_JSON = "json";
+  private static final String XML_TAG_SUBCLUSTERIDINFO = "subClusterIdInfo";
+  private static final String XML_TAG_AMRMPOLICYWEIGHTS = "amrmPolicyWeights";
+  private static final String XML_TAG_ROUTERPOLICYWEIGHTS = "routerPolicyWeights";
+  private static final String XML_TAG_HEADROOMALPHA = "headroomAlpha";
+  private static final String XML_TAG_FEDERATION_WEIGHTS = "federationWeights";
+  private static final String XML_TAG_QUEUE = "queue";
+  private static final String XML_TAG_NAME = "name";
 
   public RouterCLI() {
     super();
@@ -161,7 +190,8 @@ public class RouterCLI extends Configured implements Tool {
         .append("The full syntax is: \n\n")
         .append("routeradmin\n")
         .append("   [-deregisterSubCluster [-sc|--subClusterId [subCluster Id]]\n")
-        .append("   [-policy [-s|--save [queue;router weight;amrm weight;headroomalpha]]\n")
+        .append("   [-policy [-s|--save [queue;router weight;amrm weight;headroomalpha] " +
+                "[-bs|--batch-save [--format xml,json] [-f|--input-file fileName]]]\n")
         .append("   [-help [cmd]]").append("\n");
     StringBuilder helpBuilder = new StringBuilder();
     System.out.println(summary);
@@ -304,7 +334,23 @@ public class RouterCLI extends Configured implements Tool {
         "We will save the policy information of the queue, " +
         "including queue and weight information");
     saveOpt.setOptionalArg(true);
+    Option batchSaveOpt = new Option(OPTION_BATCH_S, OPTION_BATCH_SAVE, false,
+        "We will save queue policies in bulk, " +
+         "where users can provide XML or JSON files containing the policies. " +
+         "This command will parse the file contents and store the results " +
+         "in the FederationStateStore.");
+    Option formatOpt = new Option(null, "format", true,
+        "Users can specify the file format using this option. " +
+        "Currently, there are one supported file formats: XML." +
+        "These files contain the policy information for storing queue policies.");
+    Option fileOpt = new Option("f", "input-file", true,
+        "The location of the input configuration file. ");
+    formatOpt.setOptionalArg(true);
+
     opts.addOption(saveOpt);
+    opts.addOption(batchSaveOpt);
+    opts.addOption(formatOpt);
+    opts.addOption(fileOpt);
 
     // Parse command line arguments.
     CommandLine cliParser;
@@ -317,12 +363,42 @@ public class RouterCLI extends Configured implements Tool {
     }
 
     // Try to parse the cmd save.
+    // Save a single queue policy
     if (cliParser.hasOption(OPTION_S) || cliParser.hasOption(OPTION_SAVE)) {
       String policy = cliParser.getOptionValue(OPTION_S);
       if (StringUtils.isBlank(policy)) {
         policy = cliParser.getOptionValue(OPTION_SAVE);
       }
       return handleSavePolicy(policy);
+    } else if (cliParser.hasOption(OPTION_BATCH_S) || cliParser.hasOption(OPTION_BATCH_SAVE)) {
+      // Save Queue Policies in Batches
+      // Determine whether the file format is accurate, XML or JSON format.
+      // If it is not XML or JSON, we will directly prompt the user with an error message.
+      String format = null;
+      if (cliParser.hasOption(OPTION_FORMAT)) {
+        format = cliParser.getOptionValue(OPTION_FORMAT);
+        if (StringUtils.isBlank(format) ||
+            !StringUtils.equalsAnyIgnoreCase(format, FORMAT_XML)) {
+          System.out.println("We currently only support policy configuration files " +
+              "in XML formats.");
+          return EXIT_ERROR;
+        }
+      }
+
+      // Parse configuration file path.
+      String filePath = null;
+      if (cliParser.hasOption(OPTION_FILE) || cliParser.hasOption(OPTION_INPUT_FILE)) {
+        filePath = cliParser.getOptionValue(OPTION_FILE);
+        if (StringUtils.isBlank(filePath)) {
+          filePath = cliParser.getOptionValue(OPTION_INPUT_FILE);
+        }
+      }
+
+      // Batch SavePolicies.
+      return handBatchSavePolicies(format, filePath);
+    } else {
+      // printUsage
+      printUsage(args[0]);
     }
 
     return EXIT_ERROR;
@@ -342,6 +418,30 @@ public class RouterCLI extends Configured implements Tool {
     }
   }
 
+  private int handBatchSavePolicies(String format, String policyFile) {
+
+    if(StringUtils.isBlank(format)) {
+      LOG.error("Batch Save Federation Policies. Format is Empty.");
+      return EXIT_ERROR;
+    }
+
+    if(StringUtils.isBlank(policyFile)) {
+      LOG.error("Batch Save Federation Policies. policyFile is Empty.");
+      return EXIT_ERROR;
+    }
+
+    LOG.info("Batch Save Federation Policies. Format = {}, PolicyFile = {}.",
+        format, policyFile);
+
+    switch (format) {
+    case FORMAT_XML:
+      return parseXml2PoliciesAndBatchSavePolicies(policyFile);
+    default:
+      System.out.println("We currently only support XML formats.");
+      return EXIT_ERROR;
+    }
+  }
+
   /**
    * We will parse the policy, and it has specific formatting requirements.
    *
@@ -384,6 +484,140 @@ public class RouterCLI extends Configured implements Tool {
     return request;
   }
 
+  /**
+   * Parse Policies from XML and save them in batches to FederationStateStore.
+   *
+   * We save 20 policies in one batch.
+   * If the user needs to save 1000 policies, it will cycle 50 times.
+   *
+   * Every time a page is saved, we will print whether a page
+   * has been saved successfully or failed.
+   *
+   * @param policiesXml Policies Xml Path.
+   * @return 0, success; 1, failed.
+   */
+  protected int parseXml2PoliciesAndBatchSavePolicies(String policiesXml) {
+    try {
+      List<FederationQueueWeight> federationQueueWeightsList = parsePoliciesByXml(policiesXml);
+      MemoryPageUtils<FederationQueueWeight> memoryPageUtils = new MemoryPageUtils<>(20);
+      federationQueueWeightsList.forEach(federationQueueWeight ->
+          memoryPageUtils.addToMemory(federationQueueWeight));
+      int pages = memoryPageUtils.getPages();
+      for (int i = 0; i < pages; i++) {
+        List<FederationQueueWeight> federationQueueWeights =
+            memoryPageUtils.readFromMemory(i);
+        BatchSaveFederationQueuePoliciesRequest request =
+            BatchSaveFederationQueuePoliciesRequest.newInstance(federationQueueWeights);
+        ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
+        BatchSaveFederationQueuePoliciesResponse response =
+            adminProtocol.batchSaveFederationQueuePolicies(request);
+        System.out.println("page <" + (i + 1) + "> : " + response.getMessage());
+      }
+    } catch (Exception e) {
+      LOG.error("BatchSaveFederationQueuePolicies error", e);
+    }
+    return EXIT_ERROR;
+  }
+
+  /**
+   * Parse FederationQueueWeight from the xml configuration file.
+   * <p>
+   * We allow users to provide an xml configuration file,
+   * which stores the weight information of the queue.
+   *
+   * @param policiesXml Policies Xml Path.
+   * @return FederationQueueWeight List.
+   * @throws IOException an I/O exception of some sort has occurred.
+   * @throws SAXException Encapsulate a general SAX error or warning.
+   * @throws ParserConfigurationException a serious configuration error..
+   */
+  protected List<FederationQueueWeight> parsePoliciesByXml(String policiesXml)
+      throws IOException, SAXException, ParserConfigurationException {
+
+    List<FederationQueueWeight> weights = new ArrayList<>();
+
+    File xmlFile = new File(policiesXml);
+    DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+    DocumentBuilder builder = factory.newDocumentBuilder();
+    Document document = builder.parse(xmlFile);
+
+    NodeList federationsList = document.getElementsByTagName(XML_TAG_FEDERATION_WEIGHTS);
+
+    for (int i = 0; i < federationsList.getLength(); i++) {
+
+      Node federationNode = federationsList.item(i);
+
+      if (federationNode.getNodeType() == Node.ELEMENT_NODE) {
+        Element federationElement = (Element) federationNode;
+        NodeList queueList = federationElement.getElementsByTagName(XML_TAG_QUEUE);
+
+        for (int j = 0; j < queueList.getLength(); j++) {
+
+          Node queueNode = queueList.item(j);
+          if (queueNode.getNodeType() == Node.ELEMENT_NODE) {
+            Element queueElement = (Element) queueNode;
+            // parse queueName.
+            String queueName = queueElement.getElementsByTagName(XML_TAG_NAME)
+                .item(0).getTextContent();
+
+            // parse amrmPolicyWeights / routerPolicyWeights.
+            String amrmWeight = parsePolicyWeightsNode(queueElement, XML_TAG_AMRMPOLICYWEIGHTS);
+            String routerWeight = parsePolicyWeightsNode(queueElement, XML_TAG_ROUTERPOLICYWEIGHTS);
+
+            // parse headroomAlpha.
+            String headroomAlpha = queueElement.getElementsByTagName(XML_TAG_HEADROOMALPHA)
+                .item(0).getTextContent();
+
+            String policyManager = getConf().get(YarnConfiguration.FEDERATION_POLICY_MANAGER,
+                YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER);
+
+            LOG.debug("Queue: {}, AmrmPolicyWeights: {}, RouterWeight: {}, HeadroomAlpha: {}.",
+                queueName, amrmWeight, routerWeight, headroomAlpha);
+
+            FederationQueueWeight weight = FederationQueueWeight.newInstance(routerWeight,
+                amrmWeight, headroomAlpha, queueName, policyManager);
+
+            weights.add(weight);
+          }
+        }
+      }
+    }
+
+    return weights;
+  }
+
+  /**
+   * We will parse the policyWeight information.
+   *
+   * @param queueElement xml Element.
+   * @param weightType weightType, including 2 types, AmrmPolicyWeight and RouterPolicyWeight.
+   * @return concatenated string of sub-cluster weights.
+   */
+  private String parsePolicyWeightsNode(Element queueElement, String weightType) {
+    NodeList amrmPolicyWeightsList = queueElement.getElementsByTagName(weightType);
+    Node amrmPolicyWeightsNode = amrmPolicyWeightsList.item(0);
+    List<String> amRmPolicyWeights = new ArrayList<>();
+    if (amrmPolicyWeightsNode.getNodeType() == Node.ELEMENT_NODE) {
+      Element amrmPolicyWeightsElement = (Element) amrmPolicyWeightsNode;
+      NodeList subClusterIdInfoList =
+          amrmPolicyWeightsElement.getElementsByTagName(XML_TAG_SUBCLUSTERIDINFO);
+      for (int i = 0; i < subClusterIdInfoList.getLength(); i++) {
+        Node subClusterIdInfoNode = subClusterIdInfoList.item(i);
+        if (subClusterIdInfoNode.getNodeType() == Node.ELEMENT_NODE) {
+          Element subClusterIdInfoElement = (Element) subClusterIdInfoNode;
+          String subClusterId =
+              subClusterIdInfoElement.getElementsByTagName("id").item(0).getTextContent();
+          String weight =
+              subClusterIdInfoElement.getElementsByTagName("weight").item(0).getTextContent();
+          LOG.debug("WeightType[{}] - SubCluster ID: {}, Weight: {}.",
+              weightType, subClusterId, weight);
+          amRmPolicyWeights.add(subClusterId + ":" + weight);
+        }
+      }
+    }
+    return StringUtils.join(amRmPolicyWeights, ",");
+  }
+
   @Override
   public int run(String[] args) throws Exception {
     YarnConfiguration yarnConf = getConf() == null ?
@@ -405,14 +639,13 @@ public class RouterCLI extends Configured implements Tool {
         printHelp();
       }
       return EXIT_SUCCESS;
-    }
-
-    if (CMD_DEREGISTERSUBCLUSTER.equals(cmd)) {
+    } else if (CMD_DEREGISTERSUBCLUSTER.equals(cmd)) {
       return handleDeregisterSubCluster(args);
-    }
-
-    if (CMD_POLICY.equals(cmd)) {
+    } else if (CMD_POLICY.equals(cmd)) {
       return handlePolicy(args);
+    } else {
+      System.out.println("No related commands found.");
+      printHelp();
     }
 
     return EXIT_SUCCESS;

+ 58 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/util/MemoryPageUtils.java

@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.client.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This is a memory paging utility that is used to paginate a dataset.
+ *
+ * This class is designed to support batch entry queue policies.
+ */
+public class MemoryPageUtils<T> {
+  private List<T> dataList;
+  private int pageSize;
+
+  /**
+   * MemoryPageUtils constructor.
+   *
+   * @param pageSize Number of records returned per page.
+   */
+  public MemoryPageUtils(int pageSize) {
+    this.pageSize = pageSize;
+    this.dataList = new ArrayList<>();
+  }
+
+  public void addToMemory(T data) {
+    dataList.add(data);
+  }
+
+  public List<T> readFromMemory(int pageNumber) {
+    int startIndex = pageNumber * pageSize;
+    int endIndex = Math.min(startIndex + pageSize, dataList.size());
+    if (startIndex >= dataList.size()) {
+      return null;
+    }
+    return dataList.subList(startIndex, endIndex);
+  }
+
+  public int getPages() {
+    return (dataList.size() / pageSize + 1);
+  }
+}

+ 59 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestMemoryPageUtils.java

@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.client;
+
+import org.apache.hadoop.yarn.client.util.MemoryPageUtils;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * The purpose of this class is to test
+ * whether the memory paging function is as expected.
+ */
+public class TestMemoryPageUtils {
+
+  @Test
+  public void testMemoryPage() {
+    // We design such a unit test for testing pagination, and we prepare 6 pieces of policy data.
+    // If 1 page is followed by 5 pieces of data, we will get 2 pages.
+    // Page 1 will contain 5 records and page 2 will contain 1 record.
+    MemoryPageUtils<String> policies = new MemoryPageUtils<>(5);
+    policies.addToMemory("policy-1");
+    policies.addToMemory("policy-2");
+    policies.addToMemory("policy-3");
+    policies.addToMemory("policy-4");
+    policies.addToMemory("policy-5");
+    policies.addToMemory("policy-6");
+
+    // Page 1 will return 5 records.
+    List<String> firstPage = policies.readFromMemory(0);
+    assertEquals(5, firstPage.size());
+
+    // Page 2 will return 1 records
+    List<String> secondPage = policies.readFromMemory(1);
+    assertEquals(1, secondPage.size());
+
+    // Page 10, This is a wrong number of pages, we will get null.
+    List<String> tenPage = policies.readFromMemory(10);
+    assertNull(tenPage);
+  }
+}

+ 23 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRouterCLI.java

@@ -217,4 +217,27 @@ public class TestRouterCLI {
     args = new String[]{"-policy", "-save", "root.a;SC-1:0.1,SC-2:0.9;SC-1:0.7,SC-2:0.3;1.0"};
     assertEquals(0, rmAdminCLI.run(args));
   }
+
+  @Test
+  public void testParsePoliciesByXml() throws Exception {
+    String filePath =
+        TestRouterCLI.class.getClassLoader().getResource("federation-weights.xml").getFile();
+    List<FederationQueueWeight> federationQueueWeights = rmAdminCLI.parsePoliciesByXml(filePath);
+    assertNotNull(federationQueueWeights);
+    assertEquals(2, federationQueueWeights.size());
+
+    // Queue1: root.a
+    FederationQueueWeight queueWeight1 = federationQueueWeights.get(0);
+    assertNotNull(queueWeight1);
+    assertEquals("root.a", queueWeight1.getQueue());
+    assertEquals("SC-1:0.7,SC-2:0.3", queueWeight1.getAmrmWeight());
+    assertEquals("SC-1:0.6,SC-2:0.4", queueWeight1.getRouterWeight());
+
+    // Queue2: root.b
+    FederationQueueWeight queueWeight2 = federationQueueWeights.get(1);
+    assertNotNull(queueWeight2);
+    assertEquals("root.b", queueWeight2.getQueue());
+    assertEquals("SC-1:0.8,SC-2:0.2", queueWeight2.getAmrmWeight());
+    assertEquals("SC-1:0.6,SC-2:0.4", queueWeight2.getRouterWeight());
+  }
 }

+ 74 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/federation-weights.xml

@@ -0,0 +1,74 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<federationWeights>
+    <weight>
+        <queue>
+            <name>root.a</name>
+            <amrmPolicyWeights>
+                <subClusterIdInfo>
+                    <id>SC-1</id>
+                    <weight>0.7</weight>
+                </subClusterIdInfo>
+                <subClusterIdInfo>
+                    <id>SC-2</id>
+                    <weight>0.3</weight>
+                </subClusterIdInfo>
+            </amrmPolicyWeights>
+            <routerPolicyWeights>
+                <subClusterIdInfo>
+                    <id>SC-1</id>
+                    <weight>0.6</weight>
+                </subClusterIdInfo>
+                <subClusterIdInfo>
+                    <id>SC-2</id>
+                    <weight>0.4</weight>
+                </subClusterIdInfo>
+            </routerPolicyWeights>
+            <headroomAlpha>1.0</headroomAlpha>
+        </queue>
+    </weight>
+    <weight>
+        <queue>
+            <name>root.b</name>
+            <amrmPolicyWeights>
+                <subClusterIdInfo>
+                    <id>SC-1</id>
+                    <weight>0.8</weight>
+                </subClusterIdInfo>
+                <subClusterIdInfo>
+                    <id>SC-2</id>
+                    <weight>0.2</weight>
+                </subClusterIdInfo>
+            </amrmPolicyWeights>
+            <routerPolicyWeights>
+                <subClusterIdInfo>
+                    <id>SC-1</id>
+                    <weight>0.6</weight>
+                </subClusterIdInfo>
+                <subClusterIdInfo>
+                    <id>SC-2</id>
+                    <weight>0.4</weight>
+                </subClusterIdInfo>
+            </routerPolicyWeights>
+            <headroomAlpha>1.0</headroomAlpha>
+        </queue>
+    </weight>
+</federationWeights>

+ 19 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java

@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.Repla
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeregisterSubClusterRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SaveFederationQueuePolicyRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.BatchSaveFederationQueuePoliciesRequestProto;
 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB;
 import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
@@ -81,6 +82,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterReq
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesRequestPBImpl;
@@ -113,6 +116,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeregisterSubCl
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeregisterSubClusterResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SaveFederationQueuePolicyRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SaveFederationQueuePolicyResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.BatchSaveFederationQueuePoliciesRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.BatchSaveFederationQueuePoliciesResponsePBImpl;
 
 import org.apache.hadoop.thirdparty.protobuf.ServiceException;
 
@@ -381,4 +386,18 @@ public class ResourceManagerAdministrationProtocolPBClientImpl implements Resour
       return null;
     }
   }
+
+  @Override
+  public BatchSaveFederationQueuePoliciesResponse batchSaveFederationQueuePolicies(
+      BatchSaveFederationQueuePoliciesRequest request) throws YarnException, IOException {
+    BatchSaveFederationQueuePoliciesRequestProto requestProto =
+        ((BatchSaveFederationQueuePoliciesRequestPBImpl) request).getProto();
+    try {
+      return new BatchSaveFederationQueuePoliciesResponsePBImpl(
+          proxy.batchSaveFederationQueuePolicies(null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
 }

+ 23 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java

@@ -24,6 +24,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.BatchSaveFederationQueuePoliciesRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.BatchSaveFederationQueuePoliciesResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.CheckForDecommissioningNodesRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.CheckForDecommissioningNodesResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserRequestProto;
@@ -79,6 +81,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterReq
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesRequestPBImpl;
@@ -111,6 +115,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeregisterSubCl
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeregisterSubClusterResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SaveFederationQueuePolicyRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SaveFederationQueuePolicyResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.BatchSaveFederationQueuePoliciesRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.BatchSaveFederationQueuePoliciesResponsePBImpl;
 
 import org.apache.hadoop.thirdparty.protobuf.RpcController;
 import org.apache.hadoop.thirdparty.protobuf.ServiceException;
@@ -399,4 +405,21 @@ public class ResourceManagerAdministrationProtocolPBServiceImpl implements Resou
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public BatchSaveFederationQueuePoliciesResponseProto batchSaveFederationQueuePolicies(
+      RpcController controller, BatchSaveFederationQueuePoliciesRequestProto proto)
+      throws ServiceException {
+    BatchSaveFederationQueuePoliciesRequest request =
+        new BatchSaveFederationQueuePoliciesRequestPBImpl(proto);
+    try {
+      BatchSaveFederationQueuePoliciesResponse response =
+          real.batchSaveFederationQueuePolicies(request);
+      return ((BatchSaveFederationQueuePoliciesResponsePBImpl) response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }

+ 149 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/BatchSaveFederationQueuePoliciesRequestPBImpl.java

@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.thirdparty.protobuf.TextFormat;
+import org.apache.hadoop.yarn.proto.YarnProtos.FederationQueueWeightProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.BatchSaveFederationQueuePoliciesRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.BatchSaveFederationQueuePoliciesRequestProto;
+import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The class is responsible for batch-saving queue policies requests.
+ */
+@Private
+@Unstable
+public class BatchSaveFederationQueuePoliciesRequestPBImpl
+    extends BatchSaveFederationQueuePoliciesRequest {
+
+  private BatchSaveFederationQueuePoliciesRequestProto proto =
+      BatchSaveFederationQueuePoliciesRequestProto.getDefaultInstance();
+  private BatchSaveFederationQueuePoliciesRequestProto.Builder builder = null;
+  private boolean viaProto = false;
+  private List<FederationQueueWeight> federationQueueWeights = null;
+
+  public BatchSaveFederationQueuePoliciesRequestPBImpl() {
+    this.builder = BatchSaveFederationQueuePoliciesRequestProto.newBuilder();
+  }
+
+  public BatchSaveFederationQueuePoliciesRequestPBImpl(
+      BatchSaveFederationQueuePoliciesRequestProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  private synchronized void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = BatchSaveFederationQueuePoliciesRequestProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto) {
+      maybeInitBuilder();
+    }
+    if (this.federationQueueWeights != null) {
+      for (FederationQueueWeight federationQueueWeight : federationQueueWeights) {
+        FederationQueueWeightPBImpl federationQueueWeightPBImpl =
+            (FederationQueueWeightPBImpl) federationQueueWeight;
+        builder.addFederationQueueWeights(federationQueueWeightPBImpl.getProto());
+      }
+    }
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  public BatchSaveFederationQueuePoliciesRequestProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  private void initDeregisterSubClustersMapping() {
+    if (this.federationQueueWeights != null) {
+      return;
+    }
+
+    BatchSaveFederationQueuePoliciesRequestProtoOrBuilder p = viaProto ? proto : builder;
+    List<FederationQueueWeightProto> batchSaveFederationQueuePoliciesProtoList =
+        p.getFederationQueueWeightsList();
+
+    List<FederationQueueWeight> attributes = new ArrayList<>();
+    if (batchSaveFederationQueuePoliciesProtoList == null ||
+        batchSaveFederationQueuePoliciesProtoList.size() == 0) {
+      this.federationQueueWeights = attributes;
+      return;
+    }
+
+    for (FederationQueueWeightProto federationQueueWeightProto :
+        batchSaveFederationQueuePoliciesProtoList) {
+      attributes.add(new FederationQueueWeightPBImpl(federationQueueWeightProto));
+    }
+
+    this.federationQueueWeights = attributes;
+  }
+
+  @Override
+  public List<FederationQueueWeight> getFederationQueueWeights() {
+    initDeregisterSubClustersMapping();
+    return this.federationQueueWeights;
+  }
+
+  @Override
+  public void setFederationQueueWeights(List<FederationQueueWeight> pFederationQueueWeights) {
+    if (federationQueueWeights == null) {
+      federationQueueWeights = new ArrayList<>();
+    }
+    if(federationQueueWeights == null) {
+      throw new IllegalArgumentException("federationQueueWeights cannot be null");
+    }
+    federationQueueWeights.clear();
+    federationQueueWeights.addAll(pFederationQueueWeights);
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof BatchSaveFederationQueuePoliciesRequest)) {
+      return false;
+    }
+
+    BatchSaveFederationQueuePoliciesRequestPBImpl otherImpl = this.getClass().cast(other);
+    return new EqualsBuilder()
+        .append(this.getProto(), otherImpl.getProto())
+        .isEquals();
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+}

+ 99 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/BatchSaveFederationQueuePoliciesResponsePBImpl.java

@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.thirdparty.protobuf.TextFormat;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.BatchSaveFederationQueuePoliciesResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.BatchSaveFederationQueuePoliciesResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse;
+
+/**
+ * The class is responsible for batch-saving queue policies responses.
+ */
+public class BatchSaveFederationQueuePoliciesResponsePBImpl
+    extends BatchSaveFederationQueuePoliciesResponse {
+
+  private BatchSaveFederationQueuePoliciesResponseProto proto =
+      BatchSaveFederationQueuePoliciesResponseProto.getDefaultInstance();
+  private BatchSaveFederationQueuePoliciesResponseProto.Builder builder = null;
+  private boolean viaProto = false;
+
+  public BatchSaveFederationQueuePoliciesResponsePBImpl() {
+    builder = BatchSaveFederationQueuePoliciesResponseProto.newBuilder();
+  }
+
+  public BatchSaveFederationQueuePoliciesResponsePBImpl(
+      BatchSaveFederationQueuePoliciesResponseProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  public BatchSaveFederationQueuePoliciesResponseProto getProto() {
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public String getMessage() {
+    BatchSaveFederationQueuePoliciesResponseProtoOrBuilder p = viaProto ? proto : builder;
+    boolean hasMessage = p.hasMessage();
+    if (hasMessage) {
+      return p.getMessage();
+    }
+    return null;
+  }
+
+  @Override
+  public void setMessage(String msg) {
+    maybeInitBuilder();
+    if (msg == null) {
+      builder.clearMessage();
+      return;
+    }
+    builder.setMessage(msg);
+  }
+
+  private synchronized void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = BatchSaveFederationQueuePoliciesResponseProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+}

+ 40 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/FederationQueueWeightPBImpl.java

@@ -114,6 +114,46 @@ public class FederationQueueWeightPBImpl extends FederationQueueWeight {
     builder.setHeadRoomAlpha(headRoomAlpha);
   }
 
+  @Override
+  public String getQueue() {
+    FederationQueueWeightProtoOrBuilder p = this.viaProto ? this.proto : this.builder;
+    boolean hasQueue = p.hasQueue();
+    if (hasQueue) {
+      return p.getQueue();
+    }
+    return null;
+  }
+
+  @Override
+  public void setQueue(String queue) {
+    maybeInitBuilder();
+    if (queue == null) {
+      builder.clearQueue();
+      return;
+    }
+    builder.setQueue(queue);
+  }
+
+  @Override
+  public String getPolicyManagerClassName() {
+    FederationQueueWeightProtoOrBuilder p = this.viaProto ? this.proto : this.builder;
+    boolean hasPolicyManagerClassName = p.hasPolicyManagerClassName();
+    if (hasPolicyManagerClassName) {
+      return p.getPolicyManagerClassName();
+    }
+    return null;
+  }
+
+  @Override
+  public void setPolicyManagerClassName(String policyManagerClassName) {
+    maybeInitBuilder();
+    if (policyManagerClassName == null) {
+      builder.clearPolicyManagerClassName();
+      return;
+    }
+    builder.setPolicyManagerClassName(policyManagerClassName);
+  }
+
   @Override
   public int hashCode() {
     return getProto().hashCode();

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java

@@ -177,6 +177,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterReq
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse;
 import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
 
 /**
@@ -972,6 +974,12 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
     return null;
   }
 
+  @Override
+  public BatchSaveFederationQueuePoliciesResponse batchSaveFederationQueuePolicies(
+      BatchSaveFederationQueuePoliciesRequest request) throws YarnException, IOException {
+    return null;
+  }
+
   @VisibleForTesting
   public HashMap<ApplicationId, List<ContainerId>> getApplicationContainerIdMap() {
     return applicationContainerIdMap;

+ 33 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java

@@ -99,6 +99,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterReq
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
@@ -1056,11 +1058,41 @@ public class AdminService extends CompositeService implements
         "Please call Router's deregisterSubCluster to set.");
   }
 
+  /**
+   * In YARN-Federation mode, We will be storing the Policy information for Queues.
+   *
+   * RM does not support saveFederationQueuePolicy,
+   * saveFederationQueuePolicy is supported by Router.
+   *
+   * @param request saveFederationQueuePolicy Request
+   * @return Response from saveFederationQueuePolicy.
+   * @throws YarnException exceptions from yarn servers.
+   * @throws IOException if an IO error occurred.
+   */
   @Override
   public SaveFederationQueuePolicyResponse saveFederationQueuePolicy(
       SaveFederationQueuePolicyRequest request) throws YarnException, IOException {
     throw new YarnException("It is not allowed to call the RM's saveFederationQueuePolicy. " +
-        " Please call Router's deregisterSubCluster to set Policy.");
+        " Please call Router's saveFederationQueuePolicy to set Policy.");
+  }
+
+  /**
+   * In YARN-Federation mode, this method provides a way to save queue policies in batches.
+   *
+   * RM does not support batchSaveFederationQueuePolicies,
+   * batchSaveFederationQueuePolicies is supported by Router.
+   *
+   * @param request BatchSaveFederationQueuePolicies Request
+   * @return Response from batchSaveFederationQueuePolicies.
+   * @throws YarnException exceptions from yarn servers.
+   * @throws IOException if an IO error occurred.
+   */
+  @Override
+  public BatchSaveFederationQueuePoliciesResponse batchSaveFederationQueuePolicies(
+      BatchSaveFederationQueuePoliciesRequest request) throws YarnException, IOException {
+    throw new YarnException("It is not allowed to call the RM's " +
+        " batchSaveFederationQueuePolicies. " +
+        " Please call Router's batchSaveFederationQueuePolicies to set Policies.");
   }
 
   private void validateAttributesExists(

+ 33 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java

@@ -151,6 +151,8 @@ public final class RouterMetrics {
   private MutableGaugeInt numDeregisterSubClusterFailedRetrieved;
   @Metric("# of saveFederationQueuePolicy failed to be retrieved")
   private MutableGaugeInt numSaveFederationQueuePolicyFailedRetrieved;
+  @Metric("# of batchSaveFederationQueuePolicies failed to be retrieved")
+  private MutableGaugeInt numBatchSaveFederationQueuePoliciesFailedRetrieved;
   @Metric("# of refreshAdminAcls failed to be retrieved")
   private MutableGaugeInt numRefreshAdminAclsFailedRetrieved;
   @Metric("# of refreshServiceAcls failed to be retrieved")
@@ -299,6 +301,8 @@ public final class RouterMetrics {
   private MutableRate totalSucceededDeregisterSubClusterRetrieved;
   @Metric("Total number of successful Retrieved SaveFederationQueuePolicy and latency(ms)")
   private MutableRate totalSucceededSaveFederationQueuePolicyRetrieved;
+  @Metric("Total number of successful Retrieved BatchSaveFederationQueuePolicies and latency(ms)")
+  private MutableRate totalSucceededBatchSaveFederationQueuePoliciesRetrieved;
   @Metric("Total number of successful Retrieved RefreshAdminAcls and latency(ms)")
   private MutableRate totalSucceededRefreshAdminAclsRetrieved;
   @Metric("Total number of successful Retrieved RefreshServiceAcls and latency(ms)")
@@ -386,6 +390,7 @@ public final class RouterMetrics {
   private MutableQuantiles refreshUserToGroupsMappingsLatency;
   private MutableQuantiles refreshDeregisterSubClusterLatency;
   private MutableQuantiles saveFederationQueuePolicyLatency;
+  private MutableQuantiles batchSaveFederationQueuePoliciesLatency;
   private MutableQuantiles refreshAdminAclsLatency;
   private MutableQuantiles refreshServiceAclsLatency;
   private MutableQuantiles replaceLabelsOnNodesLatency;
@@ -598,7 +603,11 @@ public final class RouterMetrics {
         "latency of deregister subcluster timeouts", "ops", "latency", 10);
 
     saveFederationQueuePolicyLatency = registry.newQuantiles("saveFederationQueuePolicyLatency",
-        "latency of refresh subcluster timeouts", "ops", "latency", 10);
+        "latency of save federation queue policy timeouts", "ops", "latency", 10);
+
+    batchSaveFederationQueuePoliciesLatency = registry.newQuantiles(
+        "batchSaveFederationQueuePoliciesLatency",
+        "latency of batch save federationqueuepolicies timeouts", "ops", "latency", 10);
 
     refreshAdminAclsLatency = registry.newQuantiles("refreshAdminAclsLatency",
         "latency of refresh admin acls timeouts", "ops", "latency", 10);
@@ -934,6 +943,11 @@ public final class RouterMetrics {
     return totalSucceededSaveFederationQueuePolicyRetrieved.lastStat().numSamples();
   }
 
+  @VisibleForTesting
+  public long getNumSucceededBatchSaveFederationQueuePoliciesRetrieved() {
+    return totalSucceededBatchSaveFederationQueuePoliciesRetrieved.lastStat().numSamples();
+  }
+
   @VisibleForTesting
   public long getNumSucceededRefreshAdminAclsRetrieved() {
     return totalSucceededRefreshAdminAclsRetrieved.lastStat().numSamples();
@@ -1284,6 +1298,11 @@ public final class RouterMetrics {
     return totalSucceededSaveFederationQueuePolicyRetrieved.lastStat().mean();
   }
 
+  @VisibleForTesting
+  public double getLatencySucceededBatchSaveFederationQueuePoliciesRetrieved() {
+    return totalSucceededBatchSaveFederationQueuePoliciesRetrieved.lastStat().mean();
+  }
+
   @VisibleForTesting
   public double getLatencySucceededRefreshAdminAclsRetrieved() {
     return totalSucceededRefreshAdminAclsRetrieved.lastStat().mean();
@@ -1583,6 +1602,10 @@ public final class RouterMetrics {
     return numSaveFederationQueuePolicyFailedRetrieved.value();
   }
 
+  public int getBatchSaveFederationQueuePoliciesFailedRetrieved() {
+    return numBatchSaveFederationQueuePoliciesFailedRetrieved.value();
+  }
+
   public int getNumRefreshAdminAclsFailedRetrieved() {
     return numRefreshAdminAclsFailedRetrieved.value();
   }
@@ -1940,6 +1963,11 @@ public final class RouterMetrics {
     saveFederationQueuePolicyLatency.add(duration);
   }
 
+  public void succeededBatchSaveFederationQueuePoliciesRetrieved(long duration) {
+    totalSucceededBatchSaveFederationQueuePoliciesRetrieved.add(duration);
+    batchSaveFederationQueuePoliciesLatency.add(duration);
+  }
+
   public void succeededRefreshAdminAclsRetrieved(long duration) {
     totalSucceededRefreshAdminAclsRetrieved.add(duration);
     refreshAdminAclsLatency.add(duration);
@@ -2222,6 +2250,10 @@ public final class RouterMetrics {
     numSaveFederationQueuePolicyFailedRetrieved.incr();
   }
 
+  public void incrBatchSaveFederationQueuePoliciesFailedRetrieved() {
+    numBatchSaveFederationQueuePoliciesFailedRetrieved.incr();
+  }
+
   public void incrRefreshAdminAclsFailedRetrieved() {
     numRefreshAdminAclsFailedRetrieved.incr();
   }

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java

@@ -60,6 +60,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterReq
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -217,4 +219,10 @@ public class DefaultRMAdminRequestInterceptor
       SaveFederationQueuePolicyRequest request) throws YarnException, IOException {
     return rmAdminProxy.saveFederationQueuePolicy(request);
   }
+
+  @Override
+  public BatchSaveFederationQueuePoliciesResponse batchSaveFederationQueuePolicies(
+      BatchSaveFederationQueuePoliciesRequest request) throws YarnException, IOException {
+    return rmAdminProxy.batchSaveFederationQueuePolicies(request);
+  }
 }

+ 98 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java

@@ -66,6 +66,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusters;
 import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight;
 import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse;
 import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
@@ -950,6 +952,100 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep
     throw new YarnException("Unable to saveFederationQueuePolicy.");
   }
 
+  /**
+   * Batch Save the Queue Policies for the Federation.
+   *
+   * @param request BatchSaveFederationQueuePolicies Request
+   * @return Response from batchSaveFederationQueuePolicies.
+   * @throws YarnException exceptions from yarn servers.
+   * @throws IOException if an IO error occurred.
+   */
+  @Override
+  public BatchSaveFederationQueuePoliciesResponse batchSaveFederationQueuePolicies(
+      BatchSaveFederationQueuePoliciesRequest request) throws YarnException, IOException {
+
+    // Parameter validation.
+    if (request == null) {
+      routerMetrics.incrBatchSaveFederationQueuePoliciesFailedRetrieved();
+      RouterServerUtil.logAndThrowException(
+          "Missing BatchSaveFederationQueuePoliciesRequest request.", null);
+    }
+
+    List<FederationQueueWeight> federationQueueWeights = request.getFederationQueueWeights();
+    if (federationQueueWeights == null) {
+      routerMetrics.incrBatchSaveFederationQueuePoliciesFailedRetrieved();
+      RouterServerUtil.logAndThrowException("Missing FederationQueueWeights information.", null);
+    }
+
+    try {
+      long startTime = clock.getTime();
+      for (FederationQueueWeight federationQueueWeight : federationQueueWeights) {
+        saveFederationQueuePolicy(federationQueueWeight);
+      }
+      long stopTime = clock.getTime();
+      routerMetrics.succeededBatchSaveFederationQueuePoliciesRetrieved(stopTime - startTime);
+      return BatchSaveFederationQueuePoliciesResponse.newInstance("batch save policies success.");
+    } catch (Exception e) {
+      routerMetrics.incrBatchSaveFederationQueuePoliciesFailedRetrieved();
+      RouterServerUtil.logAndThrowException(e,
+          "Unable to batchSaveFederationQueuePolicies due to exception. " + e.getMessage());
+    }
+
+    routerMetrics.incrBatchSaveFederationQueuePoliciesFailedRetrieved();
+    throw new YarnException("Unable to batchSaveFederationQueuePolicies.");
+  }
+
+  /**
+   * Save FederationQueuePolicy.
+   *
+   * @param federationQueueWeight queue weight.
+   * @throws YarnException exceptions from yarn servers.
+   */
+  private void saveFederationQueuePolicy(FederationQueueWeight federationQueueWeight)
+      throws YarnException {
+
+    // Step1, Check whether the weight setting of the queue is as expected.
+    String queue = federationQueueWeight.getQueue();
+    String policyManagerClassName = federationQueueWeight.getPolicyManagerClassName();
+
+    if (StringUtils.isBlank(queue)) {
+      RouterServerUtil.logAndThrowException("Missing Queue information.", null);
+    }
+
+    if (StringUtils.isBlank(policyManagerClassName)) {
+      RouterServerUtil.logAndThrowException("Missing PolicyManagerClassName information.", null);
+    }
+
+    String amRmWeight = federationQueueWeight.getAmrmWeight();
+    FederationQueueWeight.checkSubClusterQueueWeightRatioValid(amRmWeight);
+
+    String routerWeight = federationQueueWeight.getRouterWeight();
+    FederationQueueWeight.checkSubClusterQueueWeightRatioValid(routerWeight);
+
+    String headRoomAlpha = federationQueueWeight.getHeadRoomAlpha();
+    FederationQueueWeight.checkHeadRoomAlphaValid(headRoomAlpha);
+
+    // Step2, parse amRMPolicyWeights.
+    Map<SubClusterIdInfo, Float> amRMPolicyWeights = getSubClusterWeightMap(amRmWeight);
+    LOG.debug("amRMPolicyWeights = {}.", amRMPolicyWeights);
+
+    // Step3, parse routerPolicyWeights.
+    Map<SubClusterIdInfo, Float> routerPolicyWeights = getSubClusterWeightMap(routerWeight);
+    LOG.debug("routerWeights = {}.", amRMPolicyWeights);
+
+    // Step4, Initialize WeightedPolicyInfo.
+    WeightedPolicyInfo weightedPolicyInfo = new WeightedPolicyInfo();
+    weightedPolicyInfo.setHeadroomAlpha(Float.parseFloat(headRoomAlpha));
+    weightedPolicyInfo.setAMRMPolicyWeights(amRMPolicyWeights);
+    weightedPolicyInfo.setRouterPolicyWeights(routerPolicyWeights);
+
+    // Step5, Set SubClusterPolicyConfiguration.
+    SubClusterPolicyConfiguration policyConfiguration =
+        SubClusterPolicyConfiguration.newInstance(queue, policyManagerClassName,
+        weightedPolicyInfo.toByteBuffer());
+    federationFacade.setPolicyConfiguration(policyConfiguration);
+  }
+
   /**
    * Get the Map of SubClusterWeight.
    *
@@ -961,8 +1057,9 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep
    *
    * @param policyWeight policyWeight.
    * @return Map of SubClusterWeight.
+   * @throws YarnException exceptions from yarn servers.
    */
-  private Map<SubClusterIdInfo, Float> getSubClusterWeightMap(String policyWeight)
+  protected Map<SubClusterIdInfo, Float> getSubClusterWeightMap(String policyWeight)
       throws YarnException {
     FederationQueueWeight.checkSubClusterQueueWeightRatioValid(policyWeight);
     Map<SubClusterIdInfo, Float> result = new HashMap<>();

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java

@@ -67,6 +67,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterReq
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse;
 import org.apache.hadoop.yarn.server.router.RouterServerUtil;
 import org.apache.hadoop.yarn.server.router.security.authorize.RouterPolicyProvider;
 import org.apache.hadoop.yarn.util.LRUCacheHashMap;
@@ -401,4 +403,11 @@ public class RouterRMAdminService extends AbstractService
     RequestInterceptorChainWrapper pipeline = getInterceptorChain();
     return pipeline.getRootInterceptor().saveFederationQueuePolicy(request);
   }
+
+  @Override
+  public BatchSaveFederationQueuePoliciesResponse batchSaveFederationQueuePolicies(
+      BatchSaveFederationQueuePoliciesRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().batchSaveFederationQueuePolicies(request);
+  }
 }

+ 36 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java

@@ -628,6 +628,11 @@ public class TestRouterMetrics {
       LOG.info("Mocked: failed refreshClusterMaxPriority call");
       metrics.incrSaveFederationQueuePolicyFailedRetrieved();
     }
+
+    public void getBatchSaveFederationQueuePoliciesFailedRetrieved() {
+      LOG.info("Mocked: failed BatchSaveFederationQueuePolicies call");
+      metrics.incrBatchSaveFederationQueuePoliciesFailedRetrieved();
+    }
   }
 
   // Records successes for all calls
@@ -963,6 +968,12 @@ public class TestRouterMetrics {
           duration);
       metrics.succeededSaveFederationQueuePolicyRetrieved(duration);
     }
+
+    public void getBatchSaveFederationQueuePoliciesRetrieved(long duration) {
+      LOG.info("Mocked: successful BatchSaveFederationQueuePoliciesRetrieved " +
+          " call with duration {}", duration);
+      metrics.succeededBatchSaveFederationQueuePoliciesRetrieved(duration);
+    }
   }
 
   @Test
@@ -2241,4 +2252,29 @@ public class TestRouterMetrics {
     Assert.assertEquals(225,
         metrics.getLatencySucceededSaveFederationQueuePolicyRetrieved(), ASSERT_DOUBLE_DELTA);
   }
+
+  @Test
+  public void testGetBatchSaveFederationQueuePoliciesFailedRetrieved() {
+    long totalBadBefore = metrics.getBatchSaveFederationQueuePoliciesFailedRetrieved();
+    badSubCluster.getBatchSaveFederationQueuePoliciesFailedRetrieved();
+    Assert.assertEquals(totalBadBefore + 1,
+        metrics.getBatchSaveFederationQueuePoliciesFailedRetrieved());
+  }
+
+  @Test
+  public void testGetBatchSaveFederationQueuePoliciesRetrieved() {
+    long totalGoodBefore = metrics.getNumSucceededBatchSaveFederationQueuePoliciesRetrieved();
+    goodSubCluster.getBatchSaveFederationQueuePoliciesRetrieved(150);
+    Assert.assertEquals(totalGoodBefore + 1,
+        metrics.getNumSucceededBatchSaveFederationQueuePoliciesRetrieved());
+    Assert.assertEquals(150,
+        metrics.getLatencySucceededBatchSaveFederationQueuePoliciesRetrieved(),
+        ASSERT_DOUBLE_DELTA);
+    goodSubCluster.getBatchSaveFederationQueuePoliciesRetrieved(300);
+    Assert.assertEquals(totalGoodBefore + 2,
+        metrics.getNumSucceededBatchSaveFederationQueuePoliciesRetrieved());
+    Assert.assertEquals(225,
+        metrics.getLatencySucceededBatchSaveFederationQueuePoliciesRetrieved(),
+        ASSERT_DOUBLE_DELTA);
+  }
 }

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java

@@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterReq
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse;
 
 /**
  * Mock interceptor that does not do anything other than forwarding it to the
@@ -169,4 +171,10 @@ public class PassThroughRMAdminRequestInterceptor
       SaveFederationQueuePolicyRequest request) throws YarnException, IOException {
     return getNextInterceptor().saveFederationQueuePolicy(request);
   }
+
+  @Override
+  public BatchSaveFederationQueuePoliciesResponse batchSaveFederationQueuePolicies(
+      BatchSaveFederationQueuePoliciesRequest request) throws YarnException, IOException {
+    return getNextInterceptor().batchSaveFederationQueuePolicies(request);
+  }
 }

+ 144 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.router.rmadmin;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.test.LambdaTestUtils;
@@ -59,6 +60,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
 import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight;
 import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse;
 import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
 import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
 import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
@@ -74,6 +77,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -81,6 +85,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.HashSet;
+import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -700,4 +705,143 @@ public class TestFederationRMAdminInterceptor extends BaseRouterRMAdminTest {
     assertNotNull(sc2AMRMWeight);
     assertEquals(0.4f, sc2AMRMWeight.floatValue(), 0.00001);
   }
+
+  @Test
+  public void testBatchSaveFederationQueuePoliciesRequest() throws IOException, YarnException {
+
+    // subClusters
+    List<String> subClusterLists = new ArrayList<>();
+    subClusterLists.add("SC-1");
+    subClusterLists.add("SC-2");
+
+    // generate queue A, queue B, queue C
+    FederationQueueWeight rootA = generateFederationQueueWeight("root.a", subClusterLists);
+    FederationQueueWeight rootB = generateFederationQueueWeight("root.b", subClusterLists);
+    FederationQueueWeight rootC = generateFederationQueueWeight("root.b", subClusterLists);
+
+    List<FederationQueueWeight> federationQueueWeights = new ArrayList<>();
+    federationQueueWeights.add(rootA);
+    federationQueueWeights.add(rootB);
+    federationQueueWeights.add(rootC);
+
+    // Step1. Save Queue Policies in Batches
+    BatchSaveFederationQueuePoliciesRequest request =
+        BatchSaveFederationQueuePoliciesRequest.newInstance(federationQueueWeights);
+
+    BatchSaveFederationQueuePoliciesResponse policiesResponse =
+        interceptor.batchSaveFederationQueuePolicies(request);
+
+    assertNotNull(policiesResponse);
+    assertNotNull(policiesResponse.getMessage());
+    assertEquals("batch save policies success.", policiesResponse.getMessage());
+
+    // Step2. We query Policy information from FederationStateStore.
+    FederationStateStoreFacade federationFacade = interceptor.getFederationFacade();
+    SubClusterPolicyConfiguration policyConfiguration =
+        federationFacade.getPolicyConfiguration("root.a");
+    assertNotNull(policyConfiguration);
+    assertEquals("root.a", policyConfiguration.getQueue());
+
+    ByteBuffer params = policyConfiguration.getParams();
+    assertNotNull(params);
+    WeightedPolicyInfo weightedPolicyInfo = WeightedPolicyInfo.fromByteBuffer(params);
+    assertNotNull(weightedPolicyInfo);
+    Map<SubClusterIdInfo, Float> amrmPolicyWeights = weightedPolicyInfo.getAMRMPolicyWeights();
+    Map<SubClusterIdInfo, Float> routerPolicyWeights = weightedPolicyInfo.getRouterPolicyWeights();
+
+    SubClusterIdInfo sc1 = new SubClusterIdInfo("SC-1");
+    SubClusterIdInfo sc2 = new SubClusterIdInfo("SC-2");
+
+    // Check whether the AMRMWeight of SC-1 and SC-2 of root.a meet expectations
+    FederationQueueWeight queueWeight = federationQueueWeights.get(0);
+    Map<SubClusterIdInfo, Float> subClusterAmrmWeightMap =
+        interceptor.getSubClusterWeightMap(queueWeight.getAmrmWeight());
+    Float sc1ExpectedAmrmWeightFloat = amrmPolicyWeights.get(sc1);
+    Float sc1AmrmWeightFloat = subClusterAmrmWeightMap.get(sc1);
+    assertNotNull(sc1AmrmWeightFloat);
+    assertEquals(sc1ExpectedAmrmWeightFloat, sc1AmrmWeightFloat, 0.00001);
+
+    Float sc2ExpectedAmrmWeightFloat = amrmPolicyWeights.get(sc2);
+    Float sc2AmrmWeightFloat = subClusterAmrmWeightMap.get(sc2);
+    assertNotNull(sc2ExpectedAmrmWeightFloat);
+    assertEquals(sc2ExpectedAmrmWeightFloat, sc2AmrmWeightFloat, 0.00001);
+
+    // Check whether the RouterPolicyWeight of SC-1 and SC-2 of root.a meet expectations
+    Map<SubClusterIdInfo, Float> subClusterRouterWeightMap =
+        interceptor.getSubClusterWeightMap(queueWeight.getRouterWeight());
+    Float sc1ExpectedRouterWeightFloat = routerPolicyWeights.get(sc1);
+    Float sc1RouterWeightFloat = subClusterRouterWeightMap.get(sc1);
+    assertNotNull(sc1RouterWeightFloat);
+    assertEquals(sc1ExpectedRouterWeightFloat, sc1RouterWeightFloat, 0.00001);
+
+    Float sc2ExpectedRouterWeightFloat = routerPolicyWeights.get(sc2);
+    Float sc2RouterWeightFloat = subClusterRouterWeightMap.get(sc2);
+    assertNotNull(sc2ExpectedRouterWeightFloat);
+    assertEquals(sc2ExpectedRouterWeightFloat, sc2RouterWeightFloat, 0.00001);
+  }
+
+  /**
+   * Generate FederationQueueWeight.
+   * We will generate the weight information of the queue.
+   *
+   * @param queue queue name
+   * @param pSubClusters subClusters
+   * @return subCluster FederationQueueWeight
+   */
+  private FederationQueueWeight generateFederationQueueWeight(
+      String queue, List<String> pSubClusters) {
+    String routerWeight = generatePolicyWeight(pSubClusters);
+    String amrmWeight = generatePolicyWeight(pSubClusters);
+    String policyTypeName = WeightedLocalityPolicyManager.class.getCanonicalName();
+    String headRoomAlpha = "1.0";
+    return FederationQueueWeight.newInstance(routerWeight, amrmWeight, headRoomAlpha,
+        queue, policyTypeName);
+  }
+
+  /**
+   * Generating Policy Weight Data.
+   *
+   * @param pSubClusters set of sub-clusters.
+   * @return policy Weight String, like SC-1:0.7,SC-2:0.
+   */
+  private String generatePolicyWeight(List<String> pSubClusters) {
+    List<String> weights = generateWeights(subClusters.size());
+    List<String> subClusterWeight = new ArrayList<>();
+    for (int i = 0; i < pSubClusters.size(); i++) {
+      String subCluster = pSubClusters.get(i);
+      String weight = weights.get(i);
+      subClusterWeight.add(subCluster + ":" + weight);
+    }
+    return StringUtils.join(subClusterWeight, ",");
+  }
+
+  /**
+   * Generate a set of random numbers, and the sum of the numbers is 1.
+   *
+   * @param n number of random numbers generated.
+   * @return a set of random numbers
+   */
+  private List<String> generateWeights(int n) {
+    List<Float> randomNumbers = new ArrayList<>();
+    float total = 0.0f;
+
+    Random random = new Random();
+    for (int i = 0; i < n - 1; i++) {
+      float randNum = random.nextFloat();
+      randomNumbers.add(randNum);
+      total += randNum;
+    }
+
+    float lastNumber = 1 - total;
+    randomNumbers.add(lastNumber);
+
+    DecimalFormat decimalFormat = new DecimalFormat("#.##");
+    List<String> formattedRandomNumbers = new ArrayList<>();
+    for (double number : randomNumbers) {
+      String formattedNumber = decimalFormat.format(number);
+      formattedRandomNumbers.add(formattedNumber);
+    }
+
+    return formattedRandomNumbers;
+  }
 }