瀏覽代碼

YARN-11485. [Federation] Router Supports Yarn Admin CLI Cmds. (#6265) Contributed by Shilun Fan.

Reviewed-by: Inigo Goiri <inigoiri@apache.org>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
slfan1989 1 年之前
父節點
當前提交
abd550cff4

+ 157 - 34
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java

@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.regex.Pattern;
 
 import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.MissingArgumentException;
 import org.apache.commons.cli.Option;
@@ -85,6 +86,7 @@ import org.apache.hadoop.util.Preconditions;
 import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
 
 import static org.apache.hadoop.yarn.client.util.YarnClientUtils.NO_LABEL_ERR_MSG;
+import static org.apache.hadoop.yarn.client.util.YarnClientUtils.isYarnFederationEnabled;
 
 @Private
 @Unstable
@@ -99,7 +101,9 @@ public class RMAdminCLI extends HAAdmin {
       "Invalid timeout specified : ";
   private static final Pattern RESOURCE_TYPES_ARGS_PATTERN =
       Pattern.compile("^[0-9]*$");
-
+  private static final String SUBCLUSTERID = "subClusterId";
+  private static final Option OPTION_SUBCLUSTERID = new Option(SUBCLUSTERID, true,
+       "We support setting subClusterId in YARN Federation mode to specify specific subClusters.");
   protected final static Map<String, UsageInfo> ADMIN_USAGE =
       ImmutableMap.<String, UsageInfo>builder()
           .put("-refreshQueues", new UsageInfo("",
@@ -338,37 +342,50 @@ public class RMAdminCLI extends HAAdmin {
         ResourceManagerAdministrationProtocol.class);
   }
 
-  private int refreshQueues() throws IOException, YarnException {
+  private int refreshQueues(String subClusterId) throws IOException, YarnException {
     // Refresh the queue properties
     ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
     RefreshQueuesRequest request = 
       recordFactory.newRecordInstance(RefreshQueuesRequest.class);
+    if (StringUtils.isNotBlank(subClusterId)) {
+      request.setSubClusterId(subClusterId);
+    }
     adminProtocol.refreshQueues(request);
     return 0;
   }
 
-  private int refreshNodes(boolean graceful) throws IOException, YarnException {
+  private int refreshNodes(boolean graceful, String subClusterId)
+      throws IOException, YarnException {
     // Refresh the nodes
     ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
     RefreshNodesRequest request = RefreshNodesRequest.newInstance(
         graceful? DecommissionType.GRACEFUL : DecommissionType.NORMAL);
+    if (isYarnFederationEnabled(getConf()) && StringUtils.isNotBlank(subClusterId)) {
+      request.setSubClusterId(subClusterId);
+    }
     adminProtocol.refreshNodes(request);
     return 0;
   }
 
-  private int refreshNodes(int timeout, String trackingMode)
+  private int refreshNodes(int timeout, String trackingMode, String subClusterId)
       throws IOException, YarnException {
     boolean serverTracking = !"client".equals(trackingMode);
     // Graceful decommissioning with timeout
     ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
     RefreshNodesRequest gracefulRequest = RefreshNodesRequest
         .newInstance(DecommissionType.GRACEFUL, timeout);
+    if (isYarnFederationEnabled(getConf()) && StringUtils.isNotBlank(subClusterId)) {
+      gracefulRequest.setSubClusterId(subClusterId);
+    }
     adminProtocol.refreshNodes(gracefulRequest);
     if (serverTracking) {
       return 0;
     }
     CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest = recordFactory
         .newRecordInstance(CheckForDecommissioningNodesRequest.class);
+    if (isYarnFederationEnabled(getConf()) && StringUtils.isNotBlank(subClusterId)) {
+      checkForDecommissioningNodesRequest.setSubClusterId(subClusterId);
+    }
     long waitingTime;
     boolean nodesDecommissioning = true;
     // As RM enforces timeout automatically, client usually don't need
@@ -408,6 +425,9 @@ public class RMAdminCLI extends HAAdmin {
           + " seconds, issuing forceful decommissioning command.");
       RefreshNodesRequest forcefulRequest = RefreshNodesRequest
           .newInstance(DecommissionType.FORCEFUL);
+      if (isYarnFederationEnabled(getConf()) && StringUtils.isNotBlank(subClusterId)) {
+        forcefulRequest.setSubClusterId(subClusterId);
+      }
       adminProtocol.refreshNodes(forcefulRequest);
     } else {
       System.out.println("Graceful decommissioning completed in " + waitingTime
@@ -416,79 +436,100 @@ public class RMAdminCLI extends HAAdmin {
     return 0;
   }
 
-  private int refreshNodesResources() throws IOException, YarnException {
+  private int refreshNodesResources(String subClusterId)
+      throws IOException, YarnException {
     // Refresh the resources at the Nodemanager
     ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
     RefreshNodesResourcesRequest request =
-    recordFactory.newRecordInstance(RefreshNodesResourcesRequest.class);
+        recordFactory.newRecordInstance(RefreshNodesResourcesRequest.class);
+    if (StringUtils.isNotBlank(subClusterId)) {
+      request.setSubClusterId(subClusterId);
+    }
     adminProtocol.refreshNodesResources(request);
     return 0;
   }
 
-  private int refreshNodes() throws IOException, YarnException {
-    return refreshNodes(false);
+  private int refreshNodes(String subClusterId) throws IOException, YarnException {
+    return refreshNodes(false, subClusterId);
   }
 
-  private int refreshUserToGroupsMappings() throws IOException,
+  private int refreshUserToGroupsMappings(String subClusterId) throws IOException,
       YarnException {
     // Refresh the user-to-groups mappings
     ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
     RefreshUserToGroupsMappingsRequest request = 
       recordFactory.newRecordInstance(RefreshUserToGroupsMappingsRequest.class);
+    if (StringUtils.isNotBlank(subClusterId)) {
+      request.setSubClusterId(subClusterId);
+    }
     adminProtocol.refreshUserToGroupsMappings(request);
     return 0;
   }
   
-  private int refreshSuperUserGroupsConfiguration() throws IOException,
+  private int refreshSuperUserGroupsConfiguration(String subClusterId) throws IOException,
       YarnException {
     // Refresh the super-user groups
     ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
     RefreshSuperUserGroupsConfigurationRequest request = 
       recordFactory.newRecordInstance(RefreshSuperUserGroupsConfigurationRequest.class);
+    if (StringUtils.isNotBlank(subClusterId)) {
+      request.setSubClusterId(subClusterId);
+    }
     adminProtocol.refreshSuperUserGroupsConfiguration(request);
     return 0;
   }
   
-  private int refreshAdminAcls() throws IOException, YarnException {
+  private int refreshAdminAcls(String subClusterId) throws IOException, YarnException {
     // Refresh the admin acls
     ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
     RefreshAdminAclsRequest request = 
       recordFactory.newRecordInstance(RefreshAdminAclsRequest.class);
+    if (StringUtils.isNotBlank(subClusterId)) {
+      request.setSubClusterId(subClusterId);
+    }
     adminProtocol.refreshAdminAcls(request);
     return 0;
   }
   
-  private int refreshServiceAcls() throws IOException, YarnException {
+  private int refreshServiceAcls(String subClusterId) throws IOException, YarnException {
     // Refresh the service acls
     ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
     RefreshServiceAclsRequest request = 
       recordFactory.newRecordInstance(RefreshServiceAclsRequest.class);
+    if (StringUtils.isNotBlank(subClusterId)) {
+      request.setSubClusterId(subClusterId);
+    }
     adminProtocol.refreshServiceAcls(request);
     return 0;
   }
 
-  private int refreshClusterMaxPriority() throws IOException, YarnException {
+  private int refreshClusterMaxPriority(String subClusterId) throws IOException, YarnException {
     // Refresh cluster max priority
     ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
     RefreshClusterMaxPriorityRequest request =
         recordFactory.newRecordInstance(RefreshClusterMaxPriorityRequest.class);
+    if (StringUtils.isNotBlank(subClusterId)) {
+      request.setSubClusterId(subClusterId);
+    }
     adminProtocol.refreshClusterMaxPriority(request);
     return 0;
   }
 
   private int updateNodeResource(String nodeIdStr, Resource resource,
-      int overCommitTimeout) throws YarnException, IOException {
+      int overCommitTimeout, String subClusterId) throws YarnException, IOException {
 
     ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
     UpdateNodeResourceRequest request =
       recordFactory.newRecordInstance(UpdateNodeResourceRequest.class);
     NodeId nodeId = NodeId.fromString(nodeIdStr);
 
-    Map<NodeId, ResourceOption> resourceMap =
-        new HashMap<NodeId, ResourceOption>();
+    Map<NodeId, ResourceOption> resourceMap = new HashMap<>();
     resourceMap.put(
         nodeId, ResourceOption.newInstance(resource, overCommitTimeout));
     request.setNodeResourceMap(resourceMap);
+    if (StringUtils.isNotBlank(subClusterId)) {
+      request.setSubClusterId(subClusterId);
+    }
     adminProtocol.updateNodeResource(request);
     return 0;
   }
@@ -551,8 +592,10 @@ public class RMAdminCLI extends HAAdmin {
         "Add to cluster node labels.");
     opts.addOption("directlyAccessNodeLabelStore", false,
         "Directly access node label store.");
+    opts.addOption(OPTION_SUBCLUSTERID);
+
     int exitCode = -1;
-    CommandLine cliParser = null;
+    CommandLine cliParser;
     try {
       cliParser = new GnuParser().parse(opts, args);
     } catch (MissingArgumentException ex) {
@@ -561,6 +604,7 @@ public class RMAdminCLI extends HAAdmin {
       return exitCode;
     }
 
+    String subClusterId = parseSubClusterId(cliParser);
     List<NodeLabel> labels = YarnClientUtils.buildNodeLabelsFromStr(
         cliParser.getOptionValue("addToClusterNodeLabels"));
     if (cliParser.hasOption("directlyAccessNodeLabelStore")) {
@@ -570,6 +614,9 @@ public class RMAdminCLI extends HAAdmin {
           createAdminProtocol();
       AddToClusterNodeLabelsRequest request =
           AddToClusterNodeLabelsRequest.newInstance(labels);
+      if (StringUtils.isNotBlank(subClusterId)) {
+        request.setSubClusterId(subClusterId);
+      }
       adminProtocol.addToClusterNodeLabels(request);
     }
     return 0;
@@ -582,8 +629,9 @@ public class RMAdminCLI extends HAAdmin {
         "Remove From cluster node labels.");
     opts.addOption("directlyAccessNodeLabelStore", false,
         "Directly access node label store.");
+    opts.addOption(OPTION_SUBCLUSTERID);
     int exitCode = -1;
-    CommandLine cliParser = null;
+    CommandLine cliParser;
     try {
       cliParser = new GnuParser().parse(opts, args);
     } catch (MissingArgumentException ex) {
@@ -592,6 +640,7 @@ public class RMAdminCLI extends HAAdmin {
       return exitCode;
     }
 
+    String subClusterId = parseSubClusterId(cliParser);
     Set<String> labels = buildNodeLabelNamesFromStr(
         cliParser.getOptionValue("removeFromClusterNodeLabels"));
     if (cliParser.hasOption("directlyAccessNodeLabelStore")) {
@@ -602,6 +651,9 @@ public class RMAdminCLI extends HAAdmin {
           createAdminProtocol();
       RemoveFromClusterNodeLabelsRequest request =
           RemoveFromClusterNodeLabelsRequest.newInstance(labels);
+      if(StringUtils.isNotBlank(subClusterId)) {
+        request.setSubClusterId(subClusterId);
+      }
       adminProtocol.removeFromClusterNodeLabels(request);
     }
 
@@ -666,6 +718,7 @@ public class RMAdminCLI extends HAAdmin {
         "Fail on unknown nodes.");
     opts.addOption("directlyAccessNodeLabelStore", false,
         "Directly access node label store.");
+    opts.addOption(OPTION_SUBCLUSTERID);
     int exitCode = -1;
     CommandLine cliParser = null;
     try {
@@ -678,13 +731,15 @@ public class RMAdminCLI extends HAAdmin {
 
     Map<NodeId, Set<String>> map = buildNodeLabelsMapFromStr(
         cliParser.getOptionValue("replaceLabelsOnNode"));
+    String subClusterId = parseSubClusterId(cliParser);
     return replaceLabelsOnNodes(map,
         cliParser.hasOption("failOnUnknownNodes"),
-        cliParser.hasOption("directlyAccessNodeLabelStore"));
+        cliParser.hasOption("directlyAccessNodeLabelStore"), subClusterId);
   }
 
   private int replaceLabelsOnNodes(Map<NodeId, Set<String>> map,
-      boolean failOnUnknownNodes, boolean directlyAccessNodeLabelStore)
+      boolean failOnUnknownNodes, boolean directlyAccessNodeLabelStore,
+      String subClusterId)
       throws IOException, YarnException {
     if (directlyAccessNodeLabelStore) {
       getNodeLabelManagerInstance(getConf()).replaceLabelsOnNode(map);
@@ -694,6 +749,9 @@ public class RMAdminCLI extends HAAdmin {
       ReplaceLabelsOnNodeRequest request =
           ReplaceLabelsOnNodeRequest.newInstance(map);
       request.setFailOnUnknownNodes(failOnUnknownNodes);
+      if (StringUtils.isNotBlank(subClusterId) && isYarnFederationEnabled(getConf())) {
+        request.setSubClusterId(subClusterId);
+      }
       adminProtocol.replaceLabelsOnNode(request);
     }
     return 0;
@@ -739,39 +797,54 @@ public class RMAdminCLI extends HAAdmin {
     //
     // verify that we have enough command line parameters
     //
+    String subClusterId = StringUtils.EMPTY;
     if ("-refreshAdminAcls".equals(cmd) || "-refreshQueues".equals(cmd) ||
         "-refreshNodesResources".equals(cmd) ||
         "-refreshServiceAcl".equals(cmd) ||
         "-refreshUserToGroupsMappings".equals(cmd) ||
-        "-refreshSuperUserGroupsConfiguration".equals(cmd)) {
-      if (args.length != 1) {
+        "-refreshSuperUserGroupsConfiguration".equals(cmd) ||
+        "-refreshClusterMaxPriority".equals(cmd)) {
+      subClusterId = parseSubClusterId(args, isHAEnabled);
+      // If we enable Federation mode, the number of args may be either one or three.
+      // Example: -refreshQueues or -refreshQueues -subClusterId SC-1
+      if (isYarnFederationEnabled(getConf()) && args.length != 1 && args.length != 3) {
+        printUsage(cmd, isHAEnabled);
+        return exitCode;
+      } else if (!isYarnFederationEnabled(getConf()) && args.length != 1) {
+        // If Federation mode is not enabled, then the number of args can only be one.
+        // Example: -refreshQueues
         printUsage(cmd, isHAEnabled);
         return exitCode;
       }
     }
 
+    // If it is federation mode, we will print federation mode information
+    if (isYarnFederationEnabled(getConf())) {
+      System.out.println("Using YARN Federation mode.");
+    }
+
     try {
       if ("-refreshQueues".equals(cmd)) {
-        exitCode = refreshQueues();
+        exitCode = refreshQueues(subClusterId);
       } else if ("-refreshNodes".equals(cmd)) {
         exitCode = handleRefreshNodes(args, cmd, isHAEnabled);
       } else if ("-refreshNodesResources".equals(cmd)) {
-        exitCode = refreshNodesResources();
+        exitCode = refreshNodesResources(subClusterId);
       } else if ("-refreshUserToGroupsMappings".equals(cmd)) {
-        exitCode = refreshUserToGroupsMappings();
+        exitCode = refreshUserToGroupsMappings(subClusterId);
       } else if ("-refreshSuperUserGroupsConfiguration".equals(cmd)) {
-        exitCode = refreshSuperUserGroupsConfiguration();
+        exitCode = refreshSuperUserGroupsConfiguration(subClusterId);
       } else if ("-refreshAdminAcls".equals(cmd)) {
-        exitCode = refreshAdminAcls();
+        exitCode = refreshAdminAcls(subClusterId);
       } else if ("-refreshServiceAcl".equals(cmd)) {
-        exitCode = refreshServiceAcls();
+        exitCode = refreshServiceAcls(subClusterId);
       } else if ("-refreshClusterMaxPriority".equals(cmd)) {
-        exitCode = refreshClusterMaxPriority();
+        exitCode = refreshClusterMaxPriority(subClusterId);
       } else if ("-getGroups".equals(cmd)) {
         String[] usernames = Arrays.copyOfRange(args, i, args.length);
         exitCode = getGroups(usernames);
       } else if ("-updateNodeResource".equals(cmd)) {
-        exitCode = handleUpdateNodeResource(args, cmd, isHAEnabled);
+        exitCode = handleUpdateNodeResource(args, cmd, isHAEnabled, subClusterId);
       } else if ("-addToClusterNodeLabels".equals(cmd)) {
         exitCode = handleAddToClusterNodeLabels(args, cmd, isHAEnabled);
       } else if ("-removeFromClusterNodeLabels".equals(cmd)) {
@@ -827,6 +900,7 @@ public class RMAdminCLI extends HAAdmin {
         "Indicates the timeout tracking should be handled by the client.");
     opts.addOption("server", false,
         "Indicates the timeout tracking should be handled by the RM.");
+    opts.addOption(OPTION_SUBCLUSTERID);
 
     int exitCode = -1;
     CommandLine cliParser = null;
@@ -839,6 +913,7 @@ public class RMAdminCLI extends HAAdmin {
     }
 
     int timeout = -1;
+    String subClusterId = parseSubClusterId(cliParser);
     if (cliParser.hasOption("g")) {
       String strTimeout = cliParser.getOptionValue("g");
       if (strTimeout != null) {
@@ -853,9 +928,9 @@ public class RMAdminCLI extends HAAdmin {
         printUsage(cmd, isHAEnabled);
         return -1;
       }
-      return refreshNodes(timeout, trackingMode);
+      return refreshNodes(timeout, trackingMode, subClusterId);
     } else {
-      return refreshNodes();
+      return refreshNodes(subClusterId);
     }
   }
 
@@ -875,7 +950,7 @@ public class RMAdminCLI extends HAAdmin {
    * @throws YarnException if any issues thrown from server
    */
   private int handleUpdateNodeResource(
-      String[] args, String cmd, boolean isHAEnabled)
+      String[] args, String cmd, boolean isHAEnabled, String subClusterId)
           throws YarnException, IOException {
     int i = 1;
     int overCommitTimeout = ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT;
@@ -913,7 +988,7 @@ public class RMAdminCLI extends HAAdmin {
     if (i == args.length - 1) {
       overCommitTimeout = Integer.parseInt(args[i]);
     }
-    return updateNodeResource(nodeID, resource, overCommitTimeout);
+    return updateNodeResource(nodeID, resource, overCommitTimeout, subClusterId);
   }
 
   private Resource parseCommandAndCreateResource(String resourceTypes) {
@@ -1039,6 +1114,54 @@ public class RMAdminCLI extends HAAdmin {
     return "Usage: rmadmin";
   }
 
+  /**
+   * Parse subClusterId.
+   * This method will only parse subClusterId when Yarn Federation mode is enabled.
+   *
+   * @param cliParser CommandLine.
+   * @return subClusterId.
+   */
+  protected String parseSubClusterId(CommandLine cliParser) {
+    // If YARN Federation mode is not enabled, return empty.
+    if (!isYarnFederationEnabled(getConf())) {
+      return StringUtils.EMPTY;
+    }
+
+    String subClusterId = cliParser.getOptionValue(OPTION_SUBCLUSTERID);
+    if (StringUtils.isBlank(subClusterId)) {
+      return StringUtils.EMPTY;
+    }
+
+    System.out.println("SubClusterId : " + subClusterId);
+    return subClusterId;
+  }
+
+  protected String parseSubClusterId(String[] args, boolean isHAEnabled) {
+    // If YARN Federation mode is not enabled, return empty.
+    if (!isYarnFederationEnabled(getConf())) {
+      return StringUtils.EMPTY;
+    }
+
+    Options opts = new Options();
+    opts.addOption("refreshQueues", false,
+        "Refresh the hosts information at the ResourceManager.");
+    opts.addOption("refreshNodesResources", false, "");
+    opts.addOption("refreshUserToGroupsMappings", false, "");
+    opts.addOption("refreshUserToGroupsMappings", false, "");
+    opts.addOption("updateNodeResource", false, "");
+    opts.addOption(OPTION_SUBCLUSTERID);
+
+    CommandLine cliParser = null;
+    try {
+      cliParser = new DefaultParser().parse(opts, args);
+    } catch (ParseException ex) {
+      System.err.println("parseSubClusterId error, " + ex.getMessage());
+      printUsage(args[0], isHAEnabled);
+    }
+
+    return parseSubClusterId(cliParser);
+  }
+
   public static void main(String[] args) throws Exception {
     int result = ToolRunner.run(new RMAdminCLI(), args);
     System.exit(result);

+ 23 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java

@@ -1099,4 +1099,27 @@ public class TestRMAdminCLI {
     assertFalse(errOut.contains("-failover"));
     dataErr.reset();
   }
+
+  @Test
+  public void testParseSubClusterId() throws Exception {
+    rmAdminCLI.getConf().setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+
+    // replaceLabelsOnNode
+    String[] replaceLabelsOnNodeArgs = {"-replaceLabelsOnNode",
+        "node1:8000,x node2:8000=y node3,x node4=Y", "-subClusterId", "SC-1"};
+    assertEquals(0, rmAdminCLI.run(replaceLabelsOnNodeArgs));
+
+    String[] refreshQueuesArgs = {"-refreshQueues", "-subClusterId", "SC-1"};
+    assertEquals(0, rmAdminCLI.run(refreshQueuesArgs));
+
+    String[] refreshNodesResourcesArgs = {"-refreshNodesResources", "-subClusterId", "SC-1"};
+    assertEquals(0, rmAdminCLI.run(refreshNodesResourcesArgs));
+
+    String nodeIdStr = "0.0.0.0:0";
+    String resourceTypes = "memory-mb=1024Mi,vcores=1,resource2";
+    String[] updateNodeResourceArgs = {"-updateNodeResource", nodeIdStr,
+        resourceTypes, "-subClusterId", "SC-1"};
+    rmAdminCLI.parseSubClusterId(updateNodeResourceArgs, false);
+    assertEquals(-1, rmAdminCLI.run(updateNodeResourceArgs));
+  }
 }