Explorar o código

YARN-11525. [Federation] Router CLI Supports Save the SubClusterPolicyConfiguration Of Queues. (#5816)

slfan1989 hai 1 ano
pai
achega
23ecc32d3a
Modificáronse 26 ficheiros con 1322 adicións e 11 borrados
  1. 16 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java
  2. 169 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/FederationQueueWeight.java
  3. 70 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SaveFederationQueuePolicyRequest.java
  4. 46 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SaveFederationQueuePolicyResponse.java
  5. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto
  6. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
  7. 6 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
  8. 134 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RouterCLI.java
  9. 65 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRouterCLI.java
  10. 19 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java
  11. 20 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java
  12. 132 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/FederationQueueWeightPBImpl.java
  13. 162 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SaveFederationQueuePolicyRequestPBImpl.java
  14. 98 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SaveFederationQueuePolicyResponsePBImpl.java
  15. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterIdInfo.java
  16. 13 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
  17. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
  18. 9 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
  19. 31 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
  20. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java
  21. 116 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java
  22. 9 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java
  23. 33 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
  24. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java
  25. 97 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java
  26. 37 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md

+ 16 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java

@@ -58,6 +58,9 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceReque
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
+
 
 @Private
 public interface ResourceManagerAdministrationProtocol extends GetUserMappingsProtocol {
@@ -173,4 +176,17 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr
   @Idempotent
   DeregisterSubClusterResponse deregisterSubCluster(DeregisterSubClusterRequest request)
       throws YarnException, IOException;
+
+  /**
+   * In YARN-Federation mode, We will be storing the Policy information for Queues.
+   *
+   * @param request saveFederationQueuePolicy Request
+   * @return Response from saveFederationQueuePolicy.
+   * @throws YarnException exceptions from yarn servers.
+   * @throws IOException if an IO error occurred.
+   */
+  @Private
+  @Idempotent
+  SaveFederationQueuePolicyResponse saveFederationQueuePolicy(
+      SaveFederationQueuePolicyRequest request) throws YarnException, IOException;
 }

+ 169 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/FederationQueueWeight.java

@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Queue weights for representing Federation.
+ */
+@Private
+@Unstable
+public abstract class FederationQueueWeight {
+
+  /**
+   * The FederationQueueWeight object consists of three parts:
+   * routerWeight, amrmWeight, and headRoomAlpha.
+   *
+   * @param routerWeight Weight for routing applications to different subclusters.
+   * We will route the application to different subclusters based on the configured weights.
+   * Assuming we have two subclusters, SC-1 and SC-2,
+   * with a weight of 0.7 for SC-1 and 0.3 for SC-2,
+   * the application will be allocated in such a way
+   * that 70% of the applications will be assigned to SC-1 and 30% to SC-2.
+   *
+   * @param amrmWeight Weight for resource request from ApplicationMaster (AM) to
+   * different subclusters' Resource Manager (RM).
+   * Assuming we have two subclusters, SC-1 and SC-2,
+   * with a weight of 0.6 for SC-1 and 0.4 for SC-2,
+   * When AM requesting resources,
+   * 60% of the requests will be made to the Resource Manager (RM) of SC-1
+   * and 40% to the RM of SC-2.
+   *
+   * @param headRoomAlpha
+   * used by policies that balance weight-based and load-based considerations in their decisions.
+   * For policies that use this parameter,
+   * values close to 1 indicate that most of the decision
+   * should be based on currently observed headroom from various sub-clusters,
+   * values close to zero, indicate that the decision should be
+   * mostly based on weights and practically ignore current load.
+   *
+   * @return FederationQueueWeight
+   */
+  @Private
+  @Unstable
+  public static FederationQueueWeight newInstance(String routerWeight,
+      String amrmWeight, String headRoomAlpha) {
+    FederationQueueWeight federationQueueWeight = Records.newRecord(FederationQueueWeight.class);
+    federationQueueWeight.setRouterWeight(routerWeight);
+    federationQueueWeight.setAmrmWeight(amrmWeight);
+    federationQueueWeight.setHeadRoomAlpha(headRoomAlpha);
+    return federationQueueWeight;
+  }
+
+  @Public
+  @Unstable
+  public abstract String getRouterWeight();
+
+  @Public
+  @Unstable
+  public abstract void setRouterWeight(String routerWeight);
+
+  @Public
+  @Unstable
+  public abstract String getAmrmWeight();
+
+  @Public
+  @Unstable
+  public abstract void setAmrmWeight(String amrmWeight);
+
+  @Public
+  @Unstable
+  public abstract String getHeadRoomAlpha();
+
+  @Public
+  @Unstable
+  public abstract void setHeadRoomAlpha(String headRoomAlpha);
+
+  private static final String COMMA = ",";
+  private static final String COLON = ":";
+
+  /**
+   * Check if the subCluster Queue Weight Ratio are valid.
+   *
+   * This method can be used to validate RouterPolicyWeight and AMRMPolicyWeight.
+   *
+   * @param subClusterWeight the weight ratios of subClusters.
+   * @throws YarnException exceptions from yarn servers.
+   */
+  public static void checkSubClusterQueueWeightRatioValid(String subClusterWeight)
+      throws YarnException {
+    // The subClusterWeight cannot be empty.
+    if (StringUtils.isBlank(subClusterWeight)) {
+      throw new YarnException("subClusterWeight can't be empty!");
+    }
+
+    // SC-1:0.7,SC-2:0.3 -> [SC-1:0.7,SC-2:0.3]
+    String[] subClusterWeights = subClusterWeight.split(COMMA);
+    Map<String, Double> subClusterWeightMap = new LinkedHashMap<>();
+    for (String subClusterWeightItem : subClusterWeights) {
+      // SC-1:0.7 -> [SC-1,0.7]
+      // We require that the parsing result is not empty and must have a length of 2.
+      String[] subClusterWeightItems = subClusterWeightItem.split(COLON);
+      if (subClusterWeightItems == null || subClusterWeightItems.length != 2) {
+        throw new YarnException("The subClusterWeight cannot be empty," +
+            " and the subClusterWeight size must be 2. (eg.SC-1,0.2)");
+      }
+      subClusterWeightMap.put(subClusterWeightItems[0], Double.valueOf(subClusterWeightItems[1]));
+    }
+
+    // The sum of weight ratios for subClusters must be equal to 1.
+    double sum = subClusterWeightMap.values().stream().mapToDouble(Double::doubleValue).sum();
+    boolean isValid = Math.abs(sum - 1.0) < 1e-6; // Comparing with a tolerance of 1e-6
+
+    if (!isValid) {
+      throw new YarnException("The sum of ratios for all subClusters must be equal to 1.");
+    }
+  }
+
+  /**
+   * Check if HeadRoomAlpha is a number and is between 0 and 1.
+   *
+   * @param headRoomAlpha headroomalpha.
+   * @throws YarnException exceptions from yarn servers.
+   */
+  public static void checkHeadRoomAlphaValid(String headRoomAlpha) throws YarnException {
+    if (!isNumeric(headRoomAlpha)) {
+      throw new YarnException("HeadRoomAlpha must be a number.");
+    }
+
+    double dHeadRoomAlpha = Double.parseDouble(headRoomAlpha);
+    if (!(dHeadRoomAlpha >= 0 && dHeadRoomAlpha <= 1)) {
+      throw new YarnException("HeadRoomAlpha must be between 0-1.");
+    }
+  }
+
+  /**
+   * Determines whether the given value is a number.
+   *
+   * @param value given value.
+   * @return true, is a number, false, not a number.
+   */
+  protected static boolean isNumeric(String value) {
+    return NumberUtils.isCreatable(value);
+  }
+}

+ 70 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SaveFederationQueuePolicyRequest.java

@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * In Yarn Federation mode, this class is used to save the queue policy interface.
+ *
+ * This class stores the queue, the weight of the queue,
+ * and the PolicyManagerClassName information of the queue.
+ */
+@Private
+@Unstable
+public abstract class SaveFederationQueuePolicyRequest {
+
+  @Private
+  @Unstable
+  public static SaveFederationQueuePolicyRequest newInstance(
+      String queue, FederationQueueWeight federationQueueWeight, String policyManagerClassName) {
+    SaveFederationQueuePolicyRequest request =
+        Records.newRecord(SaveFederationQueuePolicyRequest.class);
+    request.setQueue(queue);
+    request.setFederationQueueWeight(federationQueueWeight);
+    request.setPolicyManagerClassName(policyManagerClassName);
+    return request;
+  }
+
+  @Public
+  @Unstable
+  public abstract FederationQueueWeight getFederationQueueWeight();
+
+  @Private
+  @Unstable
+  public abstract void setFederationQueueWeight(FederationQueueWeight federationQueueWeight);
+
+  @Public
+  @Unstable
+  public abstract String getQueue();
+
+  @Public
+  @Unstable
+  public abstract void setQueue(String queue);
+
+  @Public
+  @Unstable
+  public abstract String getPolicyManagerClassName();
+
+  @Public
+  @Unstable
+  public abstract void setPolicyManagerClassName(String className);
+}

+ 46 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/SaveFederationQueuePolicyResponse.java

@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.util.Records;
+
+@Private
+@Unstable
+public abstract class SaveFederationQueuePolicyResponse {
+  public static SaveFederationQueuePolicyResponse newInstance() {
+    return Records.newRecord(SaveFederationQueuePolicyResponse.class);
+  }
+
+  public static SaveFederationQueuePolicyResponse newInstance(String msg) {
+    SaveFederationQueuePolicyResponse response =
+        Records.newRecord(SaveFederationQueuePolicyResponse.class);
+    response.setMessage(msg);
+    return response;
+  }
+
+  @Public
+  @Unstable
+  public abstract String getMessage();
+
+  @Public
+  @Unstable
+  public abstract void setMessage(String msg);
+}

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto

@@ -48,4 +48,5 @@ service ResourceManagerAdministrationProtocolService {
   rpc refreshClusterMaxPriority(RefreshClusterMaxPriorityRequestProto) returns (RefreshClusterMaxPriorityResponseProto);
   rpc mapAttributesToNodes(NodesToAttributesMappingRequestProto) returns (NodesToAttributesMappingResponseProto);
   rpc deregisterSubCluster(DeregisterSubClusterRequestProto) returns (DeregisterSubClusterResponseProto);
+  rpc saveFederationQueuePolicy(SaveFederationQueuePolicyRequestProto) returns (SaveFederationQueuePolicyResponseProto);
 }

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto

@@ -170,6 +170,16 @@ message DeregisterSubClusterResponseProto {
   repeated DeregisterSubClustersProto deregisterSubClusters = 1;
 }
 
+message SaveFederationQueuePolicyRequestProto {
+  required string queue = 1;
+  required FederationQueueWeightProto federationQueueWeight = 2;
+  optional string policyManagerClassName = 3;
+}
+
+message SaveFederationQueuePolicyResponseProto {
+  required string message = 1;
+}
+
 //////////////////////////////////////////////////////////////////
 ///////////// RM Failover related records ////////////////////////
 //////////////////////////////////////////////////////////////////

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto

@@ -440,6 +440,12 @@ message DeregisterSubClustersProto {
   optional string subClusterState = 5;
 }
 
+message FederationQueueWeightProto {
+  optional string routerWeight = 1;
+  optional string amrmWeight = 2;
+  optional string headRoomAlpha = 3;
+}
+
 ////////////////////////////////////////////////////////////////////////
 ////// From AM_RM_Protocol /////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////////

+ 134 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RouterCLI.java

@@ -38,6 +38,11 @@ import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusters;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.OutputStreamWriter;
@@ -48,14 +53,34 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight.checkHeadRoomAlphaValid;
+import static org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight.checkSubClusterQueueWeightRatioValid;
+
 public class RouterCLI extends Configured implements Tool {
 
+
+  private static final Logger LOG = LoggerFactory.getLogger(RouterCLI.class);
+
   protected final static Map<String, UsageInfo> ADMIN_USAGE =
-      ImmutableMap.<String, UsageInfo>builder().put("-deregisterSubCluster",
-        new UsageInfo("[-sc|--subClusterId [subCluster Id]]",
+      ImmutableMap.<String, UsageInfo>builder()
+         // Command1: deregisterSubCluster
+        .put("-deregisterSubCluster", new UsageInfo(
+        "[-sc|--subClusterId [subCluster Id]]",
         "Deregister SubCluster, If the interval between the heartbeat time of the subCluster " +
         "and the current time exceeds the timeout period, " +
-        "set the state of the subCluster to SC_LOST")).build();
+        "set the state of the subCluster to SC_LOST."))
+         // Command2: policy
+        .put("-policy", new UsageInfo(
+        "[-s|--save [queue;router weight;amrm weight;headroomalpha]]",
+        "We provide a set of commands for Policy:" +
+        " Include list policies, save policies, batch save policies. " +
+        " (Note: The policy type will be directly read from the" +
+        " yarn.federation.policy-manager in the local yarn-site.xml.)" +
+        " eg. (routeradmin -policy [-s|--save] root.a;SC-1:0.7,SC-2:0.3;SC-1:0.7,SC-2:0.3;1.0)"))
+        .build();
+
+  // Common Constant
+  private static final String SEMICOLON = ";";
 
   // Command Constant
   private static final String CMD_EMPTY = "";
@@ -74,6 +99,12 @@ public class RouterCLI extends Configured implements Tool {
   private static final String CMD_DEREGISTERSUBCLUSTER = "-deregisterSubCluster";
   private static final String CMD_HELP = "-help";
 
+  // Command2: policy
+  // save policy
+  private static final String OPTION_S = "s";
+  private static final String OPTION_SAVE = "save";
+  private static final String CMD_POLICY = "-policy";
+
   public RouterCLI() {
     super();
   }
@@ -128,9 +159,10 @@ public class RouterCLI extends Configured implements Tool {
     summary.append("routeradmin is the command to execute ")
         .append("YARN Federation administrative commands.\n")
         .append("The full syntax is: \n\n")
-        .append("routeradmin")
-        .append(" [-deregisterSubCluster [-sc|--subClusterId [subCluster Id]]")
-        .append(" [-help [cmd]]").append("\n");
+        .append("routeradmin\n")
+        .append("   [-deregisterSubCluster [-sc|--subClusterId [subCluster Id]]\n")
+        .append("   [-policy [-s|--save [queue;router weight;amrm weight;headroomalpha]]\n")
+        .append("   [-help [cmd]]").append("\n");
     StringBuilder helpBuilder = new StringBuilder();
     System.out.println(summary);
 
@@ -260,6 +292,98 @@ public class RouterCLI extends Configured implements Tool {
     return EXIT_SUCCESS;
   }
 
+  private int handlePolicy(String[] args)
+      throws IOException, YarnException, ParseException {
+
+    // Prepare Options.
+    Options opts = new Options();
+    opts.addOption("policy", false,
+        "We provide a set of commands for Policy Include list policies, " +
+        "save policies, batch save policies.");
+    Option saveOpt = new Option(OPTION_S, OPTION_SAVE, true,
+        "We will save the policy information of the queue, " +
+        "including queue and weight information");
+    saveOpt.setOptionalArg(true);
+    opts.addOption(saveOpt);
+
+    // Parse command line arguments.
+    CommandLine cliParser;
+    try {
+      cliParser = new GnuParser().parse(opts, args);
+    } catch (MissingArgumentException ex) {
+      System.out.println("Missing argument for options");
+      printUsage(args[0]);
+      return EXIT_ERROR;
+    }
+
+    // Try to parse the cmd save.
+    if (cliParser.hasOption(OPTION_S) || cliParser.hasOption(OPTION_SAVE)) {
+      String policy = cliParser.getOptionValue(OPTION_S);
+      if (StringUtils.isBlank(policy)) {
+        policy = cliParser.getOptionValue(OPTION_SAVE);
+      }
+      return handleSavePolicy(policy);
+    }
+
+    return EXIT_ERROR;
+  }
+
+  private int handleSavePolicy(String policy) {
+    LOG.info("Save Federation Policy = {}.", policy);
+    try {
+      SaveFederationQueuePolicyRequest request = parsePolicy(policy);
+      ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
+      SaveFederationQueuePolicyResponse response = adminProtocol.saveFederationQueuePolicy(request);
+      System.out.println(response.getMessage());
+      return EXIT_SUCCESS;
+    } catch (YarnException | IOException e) {
+      LOG.error("handleSavePolicy error.", e);
+      return EXIT_ERROR;
+    }
+  }
+
+  /**
+   * We will parse the policy, and it has specific formatting requirements.
+   *
+   * 1. queue,router weight,amrm weight,headroomalpha {@link FederationQueueWeight}.
+   * 2. the sum of weights for all sub-clusters in routerWeight/amrmWeight should be 1.
+   *
+   * @param policy queue weight.
+   * @return If the conversion is correct, we will get the FederationQueueWeight,
+   * otherwise an exception will be thrown.
+   * @throws YarnException exceptions from yarn servers.
+   */
+  protected SaveFederationQueuePolicyRequest parsePolicy(String policy) throws YarnException {
+
+    String[] policyItems = policy.split(SEMICOLON);
+    if (policyItems == null || policyItems.length != 4) {
+      throw new YarnException("The policy cannot be empty or the policy is incorrect. \n" +
+          " Required information to provide: queue,router weight,amrm weight,headroomalpha \n" +
+          " eg. root.a;SC-1:0.7,SC-2:0.3;SC-1:0.7,SC-2:0.3;1.0");
+    }
+
+    String queue = policyItems[0];
+    String routerWeight = policyItems[1];
+    String amrmWeight = policyItems[2];
+    String headroomalpha = policyItems[3];
+
+    LOG.info("Policy: [Queue = {}, RouterWeight = {}, AmRmWeight = {}, Headroomalpha = {}]",
+        queue, routerWeight, amrmWeight, headroomalpha);
+
+    checkSubClusterQueueWeightRatioValid(routerWeight);
+    checkSubClusterQueueWeightRatioValid(amrmWeight);
+    checkHeadRoomAlphaValid(headroomalpha);
+
+    FederationQueueWeight federationQueueWeight =
+        FederationQueueWeight.newInstance(routerWeight, amrmWeight, headroomalpha);
+    String policyManager = getConf().get(YarnConfiguration.FEDERATION_POLICY_MANAGER,
+        YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER);
+    SaveFederationQueuePolicyRequest request = SaveFederationQueuePolicyRequest.newInstance(
+        queue, federationQueueWeight, policyManager);
+
+    return request;
+  }
+
   @Override
   public int run(String[] args) throws Exception {
     YarnConfiguration yarnConf = getConf() == null ?
@@ -287,6 +411,10 @@ public class RouterCLI extends Configured implements Tool {
       return handleDeregisterSubCluster(args);
     }
 
+    if (CMD_POLICY.equals(cmd)) {
+      return handlePolicy(args);
+    }
+
     return EXIT_SUCCESS;
   }
 

+ 65 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRouterCLI.java

@@ -19,11 +19,16 @@ package org.apache.hadoop.yarn.client.cli;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusters;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.stubbing.Answer;
@@ -35,6 +40,7 @@ import java.util.Date;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -49,6 +55,7 @@ public class TestRouterCLI {
   public void setup() throws Exception {
 
     admin = mock(ResourceManagerAdministrationProtocol.class);
+
     when(admin.deregisterSubCluster(any(DeregisterSubClusterRequest.class)))
         .thenAnswer((Answer<DeregisterSubClusterResponse>) invocationOnMock -> {
           // Step1. parse subClusterId.
@@ -63,6 +70,14 @@ public class TestRouterCLI {
           }
         });
 
+    when(admin.saveFederationQueuePolicy(any(SaveFederationQueuePolicyRequest.class)))
+        .thenAnswer((Answer<SaveFederationQueuePolicyResponse>) invocationOnMock -> {
+          // Step1. parse subClusterId.
+          Object obj = invocationOnMock.getArgument(0);
+          SaveFederationQueuePolicyRequest request = (SaveFederationQueuePolicyRequest) obj;
+          return SaveFederationQueuePolicyResponse.newInstance("success");
+        });
+
     Configuration config = new Configuration();
     config.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
 
@@ -114,6 +129,9 @@ public class TestRouterCLI {
 
     args = new String[]{"-help", "-deregisterSubCluster"};
     rmAdminCLI.run(args);
+
+    args = new String[]{"-help", "-policy"};
+    rmAdminCLI.run(args);
   }
 
   @Test
@@ -152,4 +170,51 @@ public class TestRouterCLI {
     assertEquals(0, rmAdminCLI.run(args));
 
   }
+
+  @Test
+  public void testParsePolicy() throws Exception {
+    // Case1, If policy is empty.
+    String errMsg1 = "The policy cannot be empty or the policy is incorrect. \n" +
+        " Required information to provide: queue,router weight,amrm weight,headroomalpha \n" +
+        " eg. root.a;SC-1:0.7,SC-2:0.3;SC-1:0.7,SC-2:0.3;1.0";
+    LambdaTestUtils.intercept(YarnException.class, errMsg1, () ->  rmAdminCLI.parsePolicy(""));
+
+    // Case2, If policy is incomplete, We need 4 items, but only 2 of them are provided.
+    LambdaTestUtils.intercept(YarnException.class, errMsg1,
+        () ->  rmAdminCLI.parsePolicy("root.a;SC-1:0.1,SC-2:0.9;"));
+
+    // Case3, If policy is incomplete, The weight of a subcluster is missing.
+    String errMsg2 = "The subClusterWeight cannot be empty, " +
+        "and the subClusterWeight size must be 2. (eg.SC-1,0.2)";
+    LambdaTestUtils.intercept(YarnException.class, errMsg2,
+        () ->  rmAdminCLI.parsePolicy("root.a;SC-1:0.1,SC-2;SC-1:0.1,SC-2;0.3,1.0"));
+
+    // Case4, The policy is complete, but the sum of weights for each subcluster is not equal to 1.
+    String errMsg3 = "The sum of ratios for all subClusters must be equal to 1.";
+    LambdaTestUtils.intercept(YarnException.class, errMsg3,
+        () ->  rmAdminCLI.parsePolicy("root.a;SC-1:0.1,SC-2:0.8;SC-1:0.1,SC-2;0.3,1.0"));
+
+    // If policy is root.a;SC-1:0.7,SC-2:0.3;SC-1:0.7,SC-2:0.3;1.0
+    String policy = "root.a;SC-1:0.7,SC-2:0.3;SC-1:0.6,SC-2:0.4;1.0";
+    SaveFederationQueuePolicyRequest request = rmAdminCLI.parsePolicy(policy);
+    FederationQueueWeight federationQueueWeight = request.getFederationQueueWeight();
+    assertNotNull(federationQueueWeight);
+    assertEquals("SC-1:0.7,SC-2:0.3", federationQueueWeight.getRouterWeight());
+    assertEquals("SC-1:0.6,SC-2:0.4", federationQueueWeight.getAmrmWeight());
+    assertEquals("1.0", federationQueueWeight.getHeadRoomAlpha());
+  }
+
+  @Test
+  public void testSavePolicy() throws Exception {
+    PrintStream oldOutPrintStream = System.out;
+    ByteArrayOutputStream dataOut = new ByteArrayOutputStream();
+    System.setOut(new PrintStream(dataOut));
+    oldOutPrintStream.println(dataOut);
+
+    String[] args = {"-policy", "-s", "root.a;SC-1:0.1,SC-2:0.9;SC-1:0.7,SC-2:0.3;1.0"};
+    assertEquals(0, rmAdminCLI.run(args));
+
+    args = new String[]{"-policy", "-save", "root.a;SC-1:0.1,SC-2:0.9;SC-1:0.7,SC-2:0.3;1.0"};
+    assertEquals(0, rmAdminCLI.run(args));
+  }
 }

+ 19 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java

@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.Remov
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeregisterSubClusterRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SaveFederationQueuePolicyRequestProto;
 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB;
 import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
@@ -78,6 +79,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceReque
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesRequestPBImpl;
@@ -108,6 +111,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResou
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeregisterSubClusterRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeregisterSubClusterResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SaveFederationQueuePolicyRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SaveFederationQueuePolicyResponsePBImpl;
 
 import org.apache.hadoop.thirdparty.protobuf.ServiceException;
 
@@ -362,4 +367,18 @@ public class ResourceManagerAdministrationProtocolPBClientImpl implements Resour
       return null;
     }
   }
+
+  @Override
+  public SaveFederationQueuePolicyResponse saveFederationQueuePolicy(
+      SaveFederationQueuePolicyRequest request) throws YarnException, IOException {
+    SaveFederationQueuePolicyRequestProto requestProto =
+        ((SaveFederationQueuePolicyRequestPBImpl) request).getProto();
+    try {
+      return new SaveFederationQueuePolicyResponsePBImpl(
+          proxy.saveFederationQueuePolicy(null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
 }

+ 20 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java

@@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.Updat
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeregisterSubClusterRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeregisterSubClusterResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SaveFederationQueuePolicyRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SaveFederationQueuePolicyResponseProto;
 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB;
 import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
@@ -75,6 +77,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResp
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesRequestPBImpl;
@@ -105,6 +109,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResou
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeregisterSubClusterRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DeregisterSubClusterResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SaveFederationQueuePolicyRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SaveFederationQueuePolicyResponsePBImpl;
 
 import org.apache.hadoop.thirdparty.protobuf.RpcController;
 import org.apache.hadoop.thirdparty.protobuf.ServiceException;
@@ -379,4 +385,18 @@ public class ResourceManagerAdministrationProtocolPBServiceImpl implements Resou
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public SaveFederationQueuePolicyResponseProto saveFederationQueuePolicy(RpcController controller,
+      SaveFederationQueuePolicyRequestProto proto) throws ServiceException {
+    SaveFederationQueuePolicyRequest request = new SaveFederationQueuePolicyRequestPBImpl(proto);
+    try {
+      SaveFederationQueuePolicyResponse response = real.saveFederationQueuePolicy(request);
+      return ((SaveFederationQueuePolicyResponsePBImpl) response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }

+ 132 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/FederationQueueWeightPBImpl.java

@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.proto.YarnProtos.FederationQueueWeightProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnProtos.FederationQueueWeightProto;
+import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight;
+
+@Private
+@Unstable
+public class FederationQueueWeightPBImpl extends FederationQueueWeight {
+
+  private FederationQueueWeightProto proto = FederationQueueWeightProto.getDefaultInstance();
+  private FederationQueueWeightProto.Builder builder = null;
+  private boolean viaProto = false;
+
+  public FederationQueueWeightPBImpl() {
+    this.builder = FederationQueueWeightProto.newBuilder();
+  }
+
+  public FederationQueueWeightPBImpl(FederationQueueWeightProto proto) {
+    this.proto = proto;
+    this.viaProto = true;
+  }
+
+  private synchronized void maybeInitBuilder() {
+    if (this.viaProto || this.builder == null) {
+      this.builder = FederationQueueWeightProto.newBuilder(proto);
+    }
+    this.viaProto = false;
+  }
+
+  public FederationQueueWeightProto getProto() {
+    this.proto = this.viaProto ? this.proto : this.builder.build();
+    this.viaProto = true;
+    return this.proto;
+  }
+
+  @Override
+  public String getRouterWeight() {
+    FederationQueueWeightProtoOrBuilder p = this.viaProto ? this.proto : this.builder;
+    boolean hasRouterWeight = p.hasRouterWeight();
+    if (hasRouterWeight) {
+      return p.getRouterWeight();
+    }
+    return null;
+  }
+
+  @Override
+  public void setRouterWeight(String routerWeight) {
+    maybeInitBuilder();
+    if (routerWeight == null) {
+      builder.clearRouterWeight();
+      return;
+    }
+    builder.setRouterWeight(routerWeight);
+  }
+
+  @Override
+  public String getAmrmWeight() {
+    FederationQueueWeightProtoOrBuilder p = this.viaProto ? this.proto : this.builder;
+    boolean hasAmrmWeight = p.hasAmrmWeight();
+    if (hasAmrmWeight) {
+      return p.getAmrmWeight();
+    }
+    return null;
+  }
+
+  @Override
+  public void setAmrmWeight(String amrmWeight) {
+    maybeInitBuilder();
+    if (amrmWeight == null) {
+      builder.clearAmrmWeight();
+      return;
+    }
+    builder.setAmrmWeight(amrmWeight);
+  }
+
+  @Override
+  public String getHeadRoomAlpha() {
+    FederationQueueWeightProtoOrBuilder p = this.viaProto ? this.proto : this.builder;
+    boolean hasHeadRoomAlpha = p.hasHeadRoomAlpha();
+    if (hasHeadRoomAlpha) {
+      return p.getHeadRoomAlpha();
+    }
+    return null;
+  }
+
+  @Override
+  public void setHeadRoomAlpha(String headRoomAlpha) {
+    maybeInitBuilder();
+    if (headRoomAlpha == null) {
+      builder.clearHeadRoomAlpha();
+      return;
+    }
+    builder.setHeadRoomAlpha(headRoomAlpha);
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof FederationQueueWeight)) {
+      return false;
+    }
+    FederationQueueWeightPBImpl otherImpl = this.getClass().cast(other);
+    return new EqualsBuilder()
+        .append(this.getProto(), otherImpl.getProto())
+        .isEquals();
+  }
+}

+ 162 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SaveFederationQueuePolicyRequestPBImpl.java

@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.thirdparty.protobuf.TextFormat;
+import org.apache.hadoop.yarn.proto.YarnProtos.FederationQueueWeightProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SaveFederationQueuePolicyRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SaveFederationQueuePolicyRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
+
+@Private
+@Unstable
+public class SaveFederationQueuePolicyRequestPBImpl extends SaveFederationQueuePolicyRequest {
+
+  private SaveFederationQueuePolicyRequestProto proto =
+      SaveFederationQueuePolicyRequestProto.getDefaultInstance();
+  private SaveFederationQueuePolicyRequestProto.Builder builder = null;
+  private boolean viaProto = false;
+  private FederationQueueWeight federationQueueWeight = null;
+
+  public SaveFederationQueuePolicyRequestPBImpl() {
+    builder = SaveFederationQueuePolicyRequestProto.newBuilder();
+  }
+
+  public SaveFederationQueuePolicyRequestPBImpl(SaveFederationQueuePolicyRequestProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  private synchronized void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = SaveFederationQueuePolicyRequestProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  public SaveFederationQueuePolicyRequestProto getProto() {
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof SaveFederationQueuePolicyRequest)) {
+      return false;
+    }
+
+    SaveFederationQueuePolicyRequestPBImpl otherImpl = this.getClass().cast(other);
+    return new EqualsBuilder()
+        .append(this.getProto(), otherImpl.getProto())
+        .isEquals();
+  }
+
+  @Override
+  public FederationQueueWeight getFederationQueueWeight() {
+    if (this.federationQueueWeight != null) {
+      return this.federationQueueWeight;
+    }
+    SaveFederationQueuePolicyRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasFederationQueueWeight()) {
+      return null;
+    }
+    this.federationQueueWeight = convertFromProtoFormat(p.getFederationQueueWeight());
+    return this.federationQueueWeight;
+  }
+
+  @Override
+  public void setFederationQueueWeight(FederationQueueWeight pFederationQueueWeight) {
+    if (pFederationQueueWeight == null) {
+      throw new IllegalArgumentException("FederationQueueWeight cannot be null.");
+    }
+    maybeInitBuilder();
+    this.federationQueueWeight = pFederationQueueWeight;
+    mergeLocalToBuilder();
+  }
+
+  private void mergeLocalToBuilder() {
+    if (this.federationQueueWeight != null) {
+      builder.setFederationQueueWeight(convertToProtoFormat(this.federationQueueWeight));
+    }
+  }
+
+  @Override
+  public String getQueue() {
+    SaveFederationQueuePolicyRequestProtoOrBuilder p = viaProto ? proto : builder;
+    boolean hasQueue = p.hasQueue();
+    if (hasQueue) {
+      return p.getQueue();
+    }
+    return null;
+  }
+
+  @Override
+  public void setQueue(String queue) {
+    maybeInitBuilder();
+    if (queue == null) {
+      builder.clearQueue();
+      return;
+    }
+    builder.setQueue(queue);
+  }
+
+  @Override
+  public String getPolicyManagerClassName() {
+    SaveFederationQueuePolicyRequestProtoOrBuilder p = viaProto ? proto : builder;
+    boolean hasPolicyManagerClassName = p.hasPolicyManagerClassName();
+    if (hasPolicyManagerClassName) {
+      return p.getPolicyManagerClassName();
+    }
+    return null;
+  }
+
+  @Override
+  public void setPolicyManagerClassName(String className) {
+    maybeInitBuilder();
+    if (className == null) {
+      builder.clearPolicyManagerClassName();
+      return;
+    }
+    builder.setPolicyManagerClassName(className);
+  }
+
+  private FederationQueueWeightProto convertToProtoFormat(
+      FederationQueueWeight pFederationQueueWeight) {
+    return ((FederationQueueWeightPBImpl) pFederationQueueWeight).getProto();
+  }
+
+  private FederationQueueWeight convertFromProtoFormat(
+      FederationQueueWeightProto federationQueueWeightProto) {
+    return new FederationQueueWeightPBImpl(federationQueueWeightProto);
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+}

+ 98 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/SaveFederationQueuePolicyResponsePBImpl.java

@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.thirdparty.protobuf.TextFormat;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SaveFederationQueuePolicyResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SaveFederationQueuePolicyResponseProto;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
+
+@Private
+@Unstable
+public class SaveFederationQueuePolicyResponsePBImpl extends SaveFederationQueuePolicyResponse {
+
+  private SaveFederationQueuePolicyResponseProto proto =
+      SaveFederationQueuePolicyResponseProto.getDefaultInstance();
+  private SaveFederationQueuePolicyResponseProto.Builder builder = null;
+  private boolean viaProto = false;
+
+  public SaveFederationQueuePolicyResponsePBImpl() {
+    builder = SaveFederationQueuePolicyResponseProto.newBuilder();
+  }
+
+  public SaveFederationQueuePolicyResponsePBImpl(SaveFederationQueuePolicyResponseProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public SaveFederationQueuePolicyResponseProto getProto() {
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+
+  @Override
+  public String getMessage() {
+    SaveFederationQueuePolicyResponseProtoOrBuilder p = viaProto ? proto : builder;
+    boolean hasMessage = p.hasMessage();
+    if (hasMessage) {
+      return p.getMessage();
+    }
+    return null;
+  }
+
+  private synchronized void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = SaveFederationQueuePolicyResponseProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public void setMessage(String msg) {
+    maybeInitBuilder();
+    if (msg == null) {
+      builder.clearMessage();
+      return;
+    }
+    builder.setMessage(msg);
+  }
+}

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterIdInfo.java

@@ -84,4 +84,9 @@ public class SubClusterIdInfo {
   public int hashCode() {
     return new HashCodeBuilder().append(this.id).toHashCode();
   }
+
+  @Override
+  public String toString() {
+    return "SubClusterIdInfo{ id='" + id + '\'' + '}';
+  }
 }

+ 13 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java

@@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoR
 import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@@ -311,6 +312,18 @@ public final class FederationStateStoreFacade {
     }
   }
 
+  /**
+   * Set a policy configuration into the state store.
+   *
+   * @param policyConf the policy configuration to set
+   * @throws YarnException if the request is invalid/fails
+   */
+  public void setPolicyConfiguration(SubClusterPolicyConfiguration policyConf)
+      throws YarnException {
+    stateStore.setPolicyConfiguration(
+        SetSubClusterPolicyConfigurationRequest.newInstance(policyConf));
+  }
+
   /**
    * Get the policies that is represented as
    * {@link SubClusterPolicyConfiguration} for all currently active queues in

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java

@@ -175,6 +175,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappin
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
 import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
 
 /**
@@ -964,6 +966,12 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
     return null;
   }
 
+  @Override
+  public SaveFederationQueuePolicyResponse saveFederationQueuePolicy(
+      SaveFederationQueuePolicyRequest request) throws YarnException, IOException {
+    return null;
+  }
+
   @VisibleForTesting
   public HashMap<ApplicationId, List<ContainerId>> getApplicationContainerIdMap() {
     return applicationContainerIdMap;

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java

@@ -97,6 +97,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceReque
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
@@ -1054,6 +1056,13 @@ public class AdminService extends CompositeService implements
         "Please call Router's deregisterSubCluster to set.");
   }
 
+  @Override
+  public SaveFederationQueuePolicyResponse saveFederationQueuePolicy(
+      SaveFederationQueuePolicyRequest request) throws YarnException, IOException {
+    throw new YarnException("It is not allowed to call the RM's saveFederationQueuePolicy. " +
+        " Please call Router's deregisterSubCluster to set Policy.");
+  }
+
   private void validateAttributesExists(
       List<NodeToAttributes> nodesToAttributes) throws IOException {
     NodeAttributesManager nodeAttributesManager =

+ 31 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java

@@ -149,6 +149,8 @@ public final class RouterMetrics {
   private MutableGaugeInt numRefreshUserToGroupsMappingsFailedRetrieved;
   @Metric("# of deregisterSubCluster failed to be retrieved")
   private MutableGaugeInt numDeregisterSubClusterFailedRetrieved;
+  @Metric("# of saveFederationQueuePolicy failed to be retrieved")
+  private MutableGaugeInt numSaveFederationQueuePolicyFailedRetrieved;
   @Metric("# of refreshAdminAcls failed to be retrieved")
   private MutableGaugeInt numRefreshAdminAclsFailedRetrieved;
   @Metric("# of refreshServiceAcls failed to be retrieved")
@@ -295,6 +297,8 @@ public final class RouterMetrics {
   private MutableRate totalSucceededGetSchedulerInfoRetrieved;
   @Metric("Total number of successful Retrieved DeregisterSubCluster and latency(ms)")
   private MutableRate totalSucceededDeregisterSubClusterRetrieved;
+  @Metric("Total number of successful Retrieved SaveFederationQueuePolicy and latency(ms)")
+  private MutableRate totalSucceededSaveFederationQueuePolicyRetrieved;
   @Metric("Total number of successful Retrieved RefreshAdminAcls and latency(ms)")
   private MutableRate totalSucceededRefreshAdminAclsRetrieved;
   @Metric("Total number of successful Retrieved RefreshServiceAcls and latency(ms)")
@@ -381,6 +385,7 @@ public final class RouterMetrics {
   private MutableQuantiles refreshSuperUserGroupsConfLatency;
   private MutableQuantiles refreshUserToGroupsMappingsLatency;
   private MutableQuantiles refreshDeregisterSubClusterLatency;
+  private MutableQuantiles saveFederationQueuePolicyLatency;
   private MutableQuantiles refreshAdminAclsLatency;
   private MutableQuantiles refreshServiceAclsLatency;
   private MutableQuantiles replaceLabelsOnNodesLatency;
@@ -592,6 +597,9 @@ public final class RouterMetrics {
     refreshDeregisterSubClusterLatency = registry.newQuantiles("refreshDeregisterSubClusterLatency",
         "latency of deregister subcluster timeouts", "ops", "latency", 10);
 
+    saveFederationQueuePolicyLatency = registry.newQuantiles("saveFederationQueuePolicyLatency",
+        "latency of refresh subcluster timeouts", "ops", "latency", 10);
+
     refreshAdminAclsLatency = registry.newQuantiles("refreshAdminAclsLatency",
         "latency of refresh admin acls timeouts", "ops", "latency", 10);
 
@@ -921,6 +929,11 @@ public final class RouterMetrics {
     return totalSucceededDeregisterSubClusterRetrieved.lastStat().numSamples();
   }
 
+  @VisibleForTesting
+  public long getNumSucceededSaveFederationQueuePolicyRetrieved() {
+    return totalSucceededSaveFederationQueuePolicyRetrieved.lastStat().numSamples();
+  }
+
   @VisibleForTesting
   public long getNumSucceededRefreshAdminAclsRetrieved() {
     return totalSucceededRefreshAdminAclsRetrieved.lastStat().numSamples();
@@ -1266,6 +1279,11 @@ public final class RouterMetrics {
     return totalSucceededDeregisterSubClusterRetrieved.lastStat().mean();
   }
 
+  @VisibleForTesting
+  public double getLatencySucceededSaveFederationQueuePolicyRetrieved() {
+    return totalSucceededSaveFederationQueuePolicyRetrieved.lastStat().mean();
+  }
+
   @VisibleForTesting
   public double getLatencySucceededRefreshAdminAclsRetrieved() {
     return totalSucceededRefreshAdminAclsRetrieved.lastStat().mean();
@@ -1561,6 +1579,10 @@ public final class RouterMetrics {
     return numDeregisterSubClusterFailedRetrieved.value();
   }
 
+  public int getSaveFederationQueuePolicyFailedRetrieved() {
+    return numSaveFederationQueuePolicyFailedRetrieved.value();
+  }
+
   public int getNumRefreshAdminAclsFailedRetrieved() {
     return numRefreshAdminAclsFailedRetrieved.value();
   }
@@ -1913,6 +1935,11 @@ public final class RouterMetrics {
     refreshDeregisterSubClusterLatency.add(duration);
   }
 
+  public void succeededSaveFederationQueuePolicyRetrieved(long duration) {
+    totalSucceededSaveFederationQueuePolicyRetrieved.add(duration);
+    saveFederationQueuePolicyLatency.add(duration);
+  }
+
   public void succeededRefreshAdminAclsRetrieved(long duration) {
     totalSucceededRefreshAdminAclsRetrieved.add(duration);
     refreshAdminAclsLatency.add(duration);
@@ -2191,6 +2218,10 @@ public final class RouterMetrics {
     numDeregisterSubClusterFailedRetrieved.incr();
   }
 
+  public void incrSaveFederationQueuePolicyFailedRetrieved() {
+    numSaveFederationQueuePolicyFailedRetrieved.incr();
+  }
+
   public void incrRefreshAdminAclsFailedRetrieved() {
     numRefreshAdminAclsFailedRetrieved.incr();
   }

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java

@@ -58,6 +58,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceReque
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -209,4 +211,10 @@ public class DefaultRMAdminRequestInterceptor
       throws YarnException, IOException {
     return rmAdminProxy.deregisterSubCluster(request);
   }
+
+  @Override
+  public SaveFederationQueuePolicyResponse saveFederationQueuePolicy(
+      SaveFederationQueuePolicyRequest request) throws YarnException, IOException {
+    return rmAdminProxy.saveFederationQueuePolicy(request);
+  }
 }

+ 116 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java

@@ -63,10 +63,16 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappin
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusters;
+import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
 import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 import org.apache.hadoop.yarn.server.router.RouterMetrics;
 import org.apache.hadoop.yarn.server.router.RouterServerUtil;
@@ -79,6 +85,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Map;
+import java.util.HashMap;
 import java.util.Collection;
 import java.util.Set;
 import java.util.Date;
@@ -96,6 +103,9 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep
   private static final Logger LOG =
       LoggerFactory.getLogger(FederationRMAdminInterceptor.class);
 
+  private static final String COMMA = ",";
+  private static final String COLON = ":";
+
   private Map<SubClusterId, ResourceManagerAdministrationProtocol> adminRMProxies;
   private FederationStateStoreFacade federationFacade;
   private final Clock clock = new MonotonicClock();
@@ -855,13 +865,118 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep
     } catch (Exception e) {
       routerMetrics.incrDeregisterSubClusterFailedRetrieved();
       RouterServerUtil.logAndThrowException(e,
-              "Unable to deregisterSubCluster due to exception. " + e.getMessage());
+          "Unable to deregisterSubCluster due to exception. " + e.getMessage());
     }
 
     routerMetrics.incrDeregisterSubClusterFailedRetrieved();
     throw new YarnException("Unable to deregisterSubCluster.");
   }
 
+  /**
+   * Save the Queue Policy for the Federation.
+   *
+   * @param request saveFederationQueuePolicy Request.
+   * @return Response from saveFederationQueuePolicy.
+   * @throws YarnException exceptions from yarn servers.
+   * @throws IOException if an IO error occurred.
+   */
+  @Override
+  public SaveFederationQueuePolicyResponse saveFederationQueuePolicy(
+      SaveFederationQueuePolicyRequest request) throws YarnException, IOException {
+
+    // Parameter validation.
+
+    if (request == null) {
+      routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved();
+      RouterServerUtil.logAndThrowException("Missing SaveFederationQueuePolicy request.", null);
+    }
+
+    FederationQueueWeight federationQueueWeight = request.getFederationQueueWeight();
+    if (federationQueueWeight == null) {
+      routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved();
+      RouterServerUtil.logAndThrowException("Missing FederationQueueWeight information.", null);
+    }
+
+    String queue = request.getQueue();
+    if (StringUtils.isBlank(queue)) {
+      routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved();
+      RouterServerUtil.logAndThrowException("Missing Queue information.", null);
+    }
+
+    String amRmWeight = federationQueueWeight.getAmrmWeight();
+    FederationQueueWeight.checkSubClusterQueueWeightRatioValid(amRmWeight);
+
+    String routerWeight = federationQueueWeight.getRouterWeight();
+    FederationQueueWeight.checkSubClusterQueueWeightRatioValid(routerWeight);
+
+    String headRoomAlpha = federationQueueWeight.getHeadRoomAlpha();
+    FederationQueueWeight.checkHeadRoomAlphaValid(headRoomAlpha);
+
+    try {
+      long startTime = clock.getTime();
+      // Step1, get parameters.
+      String policyManagerClassName = request.getPolicyManagerClassName();
+
+
+      // Step2, parse amRMPolicyWeights.
+      Map<SubClusterIdInfo, Float> amRMPolicyWeights = getSubClusterWeightMap(amRmWeight);
+      LOG.debug("amRMPolicyWeights = {}.", amRMPolicyWeights);
+
+      // Step3, parse routerPolicyWeights.
+      Map<SubClusterIdInfo, Float> routerPolicyWeights = getSubClusterWeightMap(routerWeight);
+      LOG.debug("routerWeights = {}.", amRMPolicyWeights);
+
+      // Step4, Initialize WeightedPolicyInfo.
+      WeightedPolicyInfo weightedPolicyInfo = new WeightedPolicyInfo();
+      weightedPolicyInfo.setHeadroomAlpha(Float.parseFloat(headRoomAlpha));
+      weightedPolicyInfo.setAMRMPolicyWeights(amRMPolicyWeights);
+      weightedPolicyInfo.setRouterPolicyWeights(routerPolicyWeights);
+
+      // Step5, Set SubClusterPolicyConfiguration.
+      SubClusterPolicyConfiguration policyConfiguration =
+          SubClusterPolicyConfiguration.newInstance(queue, policyManagerClassName,
+          weightedPolicyInfo.toByteBuffer());
+      federationFacade.setPolicyConfiguration(policyConfiguration);
+      long stopTime = clock.getTime();
+      routerMetrics.succeededSaveFederationQueuePolicyRetrieved(stopTime - startTime);
+      return SaveFederationQueuePolicyResponse.newInstance("save policy success.");
+    } catch (Exception e) {
+      routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved();
+      RouterServerUtil.logAndThrowException(e,
+          "Unable to saveFederationQueuePolicy due to exception. " + e.getMessage());
+    }
+
+    routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved();
+    throw new YarnException("Unable to saveFederationQueuePolicy.");
+  }
+
+  /**
+   * Get the Map of SubClusterWeight.
+   *
+   * This method can parse the Weight information of Router and
+   * the Weight information of AMRMProxy.
+   *
+   * An example of a parsed string is as follows:
+   * SC-1:0.7,SC-2:0.3
+   *
+   * @param policyWeight policyWeight.
+   * @return Map of SubClusterWeight.
+   */
+  private Map<SubClusterIdInfo, Float> getSubClusterWeightMap(String policyWeight)
+      throws YarnException {
+    FederationQueueWeight.checkSubClusterQueueWeightRatioValid(policyWeight);
+    Map<SubClusterIdInfo, Float> result = new HashMap<>();
+    String[] policyWeights = policyWeight.split(COMMA);
+    for (String policyWeightItem : policyWeights) {
+      String[] subClusterWeight = policyWeightItem.split(COLON);
+      String subClusterId = subClusterWeight[0];
+      SubClusterIdInfo subClusterIdInfo = new SubClusterIdInfo(subClusterId);
+      String weight = subClusterWeight[1];
+      result.put(subClusterIdInfo, Float.valueOf(weight));
+    }
+    return result;
+  }
+
   /**
    * deregisterSubCluster by SubClusterId.
    *

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java

@@ -65,6 +65,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceReque
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
 import org.apache.hadoop.yarn.server.router.RouterServerUtil;
 import org.apache.hadoop.yarn.server.router.security.authorize.RouterPolicyProvider;
 import org.apache.hadoop.yarn.util.LRUCacheHashMap;
@@ -392,4 +394,11 @@ public class RouterRMAdminService extends AbstractService
     RequestInterceptorChainWrapper pipeline = getInterceptorChain();
     return pipeline.getRootInterceptor().deregisterSubCluster(request);
   }
+
+  @Override
+  public SaveFederationQueuePolicyResponse saveFederationQueuePolicy(
+      SaveFederationQueuePolicyRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().saveFederationQueuePolicy(request);
+  }
 }

+ 33 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java

@@ -623,6 +623,11 @@ public class TestRouterMetrics {
       LOG.info("Mocked: failed getGroupsForUser call");
       metrics.incrGetGroupsForUserFailedRetrieved();
     }
+
+    public void getSaveFederationQueuePolicyFailedRetrieved() {
+      LOG.info("Mocked: failed refreshClusterMaxPriority call");
+      metrics.incrSaveFederationQueuePolicyFailedRetrieved();
+    }
   }
 
   // Records successes for all calls
@@ -952,6 +957,12 @@ public class TestRouterMetrics {
           duration);
       metrics.succeededGetGroupsForUsersRetrieved(duration);
     }
+
+    public void getSaveFederationQueuePolicyRetrieved(long duration) {
+      LOG.info("Mocked: successful SaveFederationQueuePolicy call with duration {}",
+          duration);
+      metrics.succeededSaveFederationQueuePolicyRetrieved(duration);
+    }
   }
 
   @Test
@@ -2208,4 +2219,26 @@ public class TestRouterMetrics {
     Assert.assertEquals(225,
         metrics.getLatencySucceededGetGroupsForUsersRetrieved(), ASSERT_DOUBLE_DELTA);
   }
+
+  @Test
+  public void testSaveFederationQueuePolicyFailedRetrieved() {
+    long totalBadBefore = metrics.getSaveFederationQueuePolicyFailedRetrieved();
+    badSubCluster.getSaveFederationQueuePolicyFailedRetrieved();
+    Assert.assertEquals(totalBadBefore + 1, metrics.getSaveFederationQueuePolicyFailedRetrieved());
+  }
+
+  @Test
+  public void testSaveFederationQueuePolicyRetrieved() {
+    long totalGoodBefore = metrics.getNumSucceededSaveFederationQueuePolicyRetrieved();
+    goodSubCluster.getSaveFederationQueuePolicyRetrieved(150);
+    Assert.assertEquals(totalGoodBefore + 1,
+        metrics.getNumSucceededSaveFederationQueuePolicyRetrieved());
+    Assert.assertEquals(150,
+        metrics.getLatencySucceededSaveFederationQueuePolicyRetrieved(), ASSERT_DOUBLE_DELTA);
+    goodSubCluster.getSaveFederationQueuePolicyRetrieved(300);
+    Assert.assertEquals(totalGoodBefore + 2,
+        metrics.getNumSucceededSaveFederationQueuePolicyRetrieved());
+    Assert.assertEquals(225,
+        metrics.getLatencySucceededSaveFederationQueuePolicyRetrieved(), ASSERT_DOUBLE_DELTA);
+  }
 }

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java

@@ -52,6 +52,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceReque
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusterResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
 
 /**
  * Mock interceptor that does not do anything other than forwarding it to the
@@ -161,4 +163,10 @@ public class PassThroughRMAdminRequestInterceptor
       throws YarnException, IOException {
     return getNextInterceptor().deregisterSubCluster(request);
   }
+
+  @Override
+  public SaveFederationQueuePolicyResponse saveFederationQueuePolicy(
+      SaveFederationQueuePolicyRequest request) throws YarnException, IOException {
+    return getNextInterceptor().saveFederationQueuePolicy(request);
+  }
 }

+ 97 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java

@@ -56,8 +56,15 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioning
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
+import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
 import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
 import org.junit.Assert;
@@ -66,6 +73,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -603,4 +611,93 @@ public class TestFederationRMAdminInterceptor extends BaseRouterRMAdminTest {
     assertEquals(1, groups.length);
     assertEquals("admin", groups[0]);
   }
+
+  @Test
+  public void testSaveFederationQueuePolicyErrorRequest() throws Exception {
+    // null request.
+    LambdaTestUtils.intercept(YarnException.class, "Missing SaveFederationQueuePolicy request.",
+        () -> interceptor.saveFederationQueuePolicy(null));
+
+    // federationQueueWeight is null.
+    LambdaTestUtils.intercept(
+        IllegalArgumentException.class, "FederationQueueWeight cannot be null.",
+        () -> SaveFederationQueuePolicyRequest.newInstance("root.a", null, "-"));
+
+    // queue is null
+    FederationQueueWeight federationQueueWeight =
+        FederationQueueWeight.newInstance("SC-1:0.7,SC-2:0.3", "SC-1:0.7,SC-2:0.3", "1.0");
+    SaveFederationQueuePolicyRequest request =
+        SaveFederationQueuePolicyRequest.newInstance("", federationQueueWeight, "-");
+    LambdaTestUtils.intercept(YarnException.class, "Missing Queue information.",
+        () -> interceptor.saveFederationQueuePolicy(request));
+
+    // routerWeight / amrmWeight
+    // The sum of the routerWeight is not equal to 1.
+    FederationQueueWeight federationQueueWeight2 = FederationQueueWeight.newInstance(
+        "SC-1:0.7,SC-2:0.3", "SC-1:0.8,SC-2:0.3", "1.0");
+    SaveFederationQueuePolicyRequest request2 =
+        SaveFederationQueuePolicyRequest.newInstance("root.a", federationQueueWeight2, "-");
+    LambdaTestUtils.intercept(YarnException.class,
+        "The sum of ratios for all subClusters must be equal to 1.",
+        () -> interceptor.saveFederationQueuePolicy(request2));
+  }
+
+  @Test
+  public void testSaveFederationQueuePolicyRequest() throws IOException, YarnException {
+
+    // We design unit tests, including 2 SubCluster (SC-1, SC-2)
+    // Router Weight: SC-1=0.7,SC-2=0.3
+    // AMRM Weight: SC-1=0.6,SC-2=0.4
+    // headRoomAlpha: 1.0
+    String queue = "root.a";
+    String subCluster1 = "SC-1";
+    String subCluster2 = "SC-2";
+    String routerWeight = "SC-1:0.7,SC-2:0.3";
+    String amrmWeight = "SC-1:0.6,SC-2:0.4";
+    String headRoomAlpha = "1.0";
+
+    // Step1. Write FederationQueue information to stateStore.
+    String policyTypeName = WeightedLocalityPolicyManager.class.getCanonicalName();
+    FederationQueueWeight federationQueueWeight =
+        FederationQueueWeight.newInstance(routerWeight, amrmWeight, headRoomAlpha);
+    SaveFederationQueuePolicyRequest request =
+        SaveFederationQueuePolicyRequest.newInstance(queue, federationQueueWeight, policyTypeName);
+    SaveFederationQueuePolicyResponse response = interceptor.saveFederationQueuePolicy(request);
+    assertNotNull(response);
+    assertEquals("save policy success.", response.getMessage());
+
+    // Step2. We query Policy information from FederationStateStore.
+    FederationStateStoreFacade federationFacade = interceptor.getFederationFacade();
+    SubClusterPolicyConfiguration policyConfiguration =
+        federationFacade.getPolicyConfiguration(queue);
+    assertNotNull(policyConfiguration);
+    assertEquals(queue, policyConfiguration.getQueue());
+
+    ByteBuffer params = policyConfiguration.getParams();
+    assertNotNull(params);
+    WeightedPolicyInfo weightedPolicyInfo = WeightedPolicyInfo.fromByteBuffer(params);
+    assertNotNull(weightedPolicyInfo);
+
+    SubClusterIdInfo sc1 = new SubClusterIdInfo(subCluster1);
+    SubClusterIdInfo sc2 = new SubClusterIdInfo(subCluster2);
+
+    // Step3. We will compare the accuracy of routerPolicyWeights and amrmPolicyWeights.
+    Map<SubClusterIdInfo, Float> routerPolicyWeights = weightedPolicyInfo.getRouterPolicyWeights();
+    Float sc1Weight = routerPolicyWeights.get(sc1);
+    assertNotNull(sc1Weight);
+    assertEquals(0.7f, sc1Weight.floatValue(), 0.00001);
+
+    Float sc2Weight = routerPolicyWeights.get(sc2);
+    assertNotNull(sc2Weight);
+    assertEquals(0.3f, sc2Weight.floatValue(), 0.00001);
+
+    Map<SubClusterIdInfo, Float> amrmPolicyWeights = weightedPolicyInfo.getAMRMPolicyWeights();
+    Float sc1AMRMWeight = amrmPolicyWeights.get(sc1);
+    assertNotNull(sc1AMRMWeight);
+    assertEquals(0.6f, sc1AMRMWeight.floatValue(), 0.00001);
+
+    Float sc2AMRMWeight = amrmPolicyWeights.get(sc2);
+    assertNotNull(sc2AMRMWeight);
+    assertEquals(0.4f, sc2AMRMWeight.floatValue(), 0.00001);
+  }
 }

+ 37 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md

@@ -235,9 +235,10 @@ SQL-Server scripts are located in **sbin/FederationStateStore/SQLServer/**.
 |`yarn.federation.subcluster-resolver.class` | `org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl` | The class used to resolve which subcluster a node belongs to, and which subcluster(s) a rack belongs to. |
 |`yarn.federation.machine-list` | `<path of machine-list file>` | Path of machine-list file used by `SubClusterResolver`. Each line of the file is a node with sub-cluster and rack information. Below is the example: <br/> <br/> node1, subcluster1, rack1 <br/> node2, subcluster2, rack1 <br/> node3, subcluster3, rack2 <br/> node4, subcluster3, rack2 |
 
-**How to configure the policy-manager?**
+How to configure the policy-manager?
+--------------------
 
-- Router Policy
+Router Policy
 
   Router Policy defines the logic for determining the routing of an application submission and determines the HomeSubCluster for the application.
 
@@ -263,7 +264,7 @@ SQL-Server scripts are located in **sbin/FederationStateStore/SQLServer/**.
   - WeightedRandomRouterPolicy
     - This policy implements a weighted random sample among currently active sub-clusters.
 
-- AMRM Policy
+AMRM Policy
 
   AMRM Proxy defines the logic to split the resource request list received by AM among RMs.
 
@@ -282,7 +283,7 @@ SQL-Server scripts are located in **sbin/FederationStateStore/SQLServer/**.
   - RejectAMRMProxyPolicy
     - This policy simply rejects all requests. Useful to prevent apps from accessing any sub-cluster.
 
-- Policy Manager
+Policy Manager
 
   The PolicyManager is providing a combination of RouterPolicy and AMRMPolicy.
 
@@ -316,6 +317,38 @@ SQL-Server scripts are located in **sbin/FederationStateStore/SQLServer/**.
   - WeightedLocalityPolicyManager
     - Policy that allows operator to configure "weights" for routing. This picks a LocalityRouterPolicy for the router and a LocalityMulticastAMRMProxyPolicy for the amrmproxy as they are designed to work together.
 
+How to configure the queue policy?
+--------------------
+
+We will provide a set of commands to view and save queue policies.
+
+The Queue Policy(SubClusterPolicyConfiguration) include the following:
+
+| Property       | Description                                                           |
+|:---------------|:----------------------------------------------------------------------|
+| `queue`        | `Queue for Job submission`                                            |
+| `policyType`   | `Policy Manager Class name, Default is UniformBroadcastPolicyManager` |
+| `policyParams` | `It stores serialized objects of WeightedPolicyInfo.`                 |
+
+WeightedPolicyInfo include the following:
+
+- RouterWeight
+
+  Weight for routing applications to different subclusters. We will route the application to different subclusters based on the configured weights.
+  Assuming we have two subclusters, SC-1 and SC-2, with a weight of 0.7 for SC-1 and 0.3 for SC-2,
+  the application will be allocated in such a way that 70% of the applications will be assigned to SC-1 and 30% to SC-2.
+
+- AmRMWeight
+
+  Weight for resource request from ApplicationMaster (AM) to different subclusters' Resource Manager (RM).
+  Assuming we have two subclusters, SC-1 and SC-2, with a weight of 0.6 for SC-1 and 0.4 for SC-2,
+  When AM requesting resources, 60% of the requests will be made to the Resource Manager (RM) of SC-1 and 40% to the RM of SC-2.
+
+- HeadRoomAlpha
+
+  used by policies that balance weight-based and load-based considerations in their decisions.
+  For policies that use this parameter, values close to 1 indicate that most of the decision should be based on currently observed headroom from various sub-clusters, values close to zero, indicate that the decision should be mostly based on weights and practically ignore current load.
+
 ### ON RMs:
 
 These are extra configurations that should appear in the **conf/yarn-site.xml** at each ResourceManager.