Explorar o código

YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1531222 13f79535-47bb-0310-9956-ffa450edef68
Luke Lu %!s(int64=11) %!d(string=hai) anos
pai
achega
735d8b27f7

+ 2 - 0
hadoop-yarn-project/CHANGES.txt

@@ -34,6 +34,8 @@ Release 2.3.0 - UNRELEASED
 
   IMPROVEMENTS
 
+    YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)
+
     YARN-905. Add state filters to nodes CLI (Wei Yan via Sandy Ryza)
 
     YARN-1098. Separate out RM services into Always On and Active (Karthik

+ 21 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java

@@ -180,6 +180,8 @@ public class ApplicationMaster {
   private int numTotalContainers = 1;
   // Memory to request for the container on which the shell command will run
   private int containerMemory = 10;
+  // VirtualCores to request for the container on which the shell command will run
+  private int containerVirtualCores = 1;
   // Priority of the request
   private int requestPriority;
 
@@ -309,6 +311,8 @@ public class ApplicationMaster {
         "Environment for shell script. Specified as env_key=env_val pairs");
     opts.addOption("container_memory", true,
         "Amount of memory in MB to be requested to run the shell command");
+    opts.addOption("container_vcores", true,
+        "Amount of virtual cores to be requested to run the shell command");
     opts.addOption("num_containers", true,
         "No. of containers on which the shell command needs to be executed");
     opts.addOption("priority", true, "Application Priority. Default 0");
@@ -421,6 +425,8 @@ public class ApplicationMaster {
 
     containerMemory = Integer.parseInt(cliParser.getOptionValue(
         "container_memory", "10"));
+    containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
+        "container_vcores", "1"));
     numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
         "num_containers", "1"));
     if (numTotalContainers == 0) {
@@ -492,6 +498,9 @@ public class ApplicationMaster {
     // resource manager
     int maxMem = response.getMaximumResourceCapability().getMemory();
     LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+    
+    int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
+    LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores);
 
     // A resource ask cannot exceed the max.
     if (containerMemory > maxMem) {
@@ -501,6 +510,13 @@ public class ApplicationMaster {
       containerMemory = maxMem;
     }
 
+    if (containerVirtualCores > maxVCores) {
+      LOG.info("Container virtual cores specified above max threshold of cluster."
+          + " Using max value." + ", specified=" + containerVirtualCores + ", max="
+          + maxVCores);
+      containerVirtualCores = maxVCores;
+    }
+
     // Setup ask for containers from RM
     // Send request for containers to RM
     // Until we get our fully allocated quota, we keep on polling RM for
@@ -645,7 +661,9 @@ public class ApplicationMaster {
             + ":" + allocatedContainer.getNodeId().getPort()
             + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
             + ", containerResourceMemory"
-            + allocatedContainer.getResource().getMemory());
+            + allocatedContainer.getResource().getMemory()
+            + ", containerResourceVirtualCores"
+            + allocatedContainer.getResource().getVirtualCores());
         // + ", containerToken"
         // +allocatedContainer.getContainerToken().getIdentifier().toString());
 
@@ -872,9 +890,10 @@ public class ApplicationMaster {
     pri.setPriority(requestPriority);
 
     // Set up resource type requirements
-    // For now, only memory is supported so we set memory requirements
+    // For now, memory and CPU are supported so we set memory and cpu requirements
     Resource capability = Records.newRecord(Resource.class);
     capability.setMemory(containerMemory);
+    capability.setVirtualCores(containerVirtualCores);
 
     ContainerRequest request = new ContainerRequest(capability, null, null,
         pri);

+ 31 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java

@@ -121,6 +121,8 @@ public class Client {
   private String amQueue = "";
   // Amt. of memory resource to request for to run the App Master
   private int amMemory = 10; 
+  // Amt. of virtual core resource to request for to run the App Master
+  private int amVCores = 1;
 
   // Application master jar file
   private String appMasterJar = ""; 
@@ -140,6 +142,8 @@ public class Client {
 
   // Amt of memory to request for container in which shell script will be executed
   private int containerMemory = 10; 
+  // Amt. of virtual cores to request for container in which shell script will be executed
+  private int containerVirtualCores = 1;
   // No. of containers in which the shell script needs to be executed
   private int numContainers = 1;
 
@@ -208,6 +212,7 @@ public class Client {
     opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
     opts.addOption("timeout", true, "Application timeout in milliseconds");
     opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
+    opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master");
     opts.addOption("jar", true, "Jar file containing the application master");
     opts.addOption("shell_command", true, "Shell command to be executed by the Application Master");
     opts.addOption("shell_script", true, "Location of the shell script to be executed");
@@ -215,6 +220,7 @@ public class Client {
     opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
     opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
     opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
+    opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
     opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
     opts.addOption("log_properties", true, "log4j.properties file");
     opts.addOption("debug", false, "Dump out debug information");
@@ -263,11 +269,16 @@ public class Client {
     amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
     amQueue = cliParser.getOptionValue("queue", "default");
     amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));		
-
+    amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1"));
+    
     if (amMemory < 0) {
       throw new IllegalArgumentException("Invalid memory specified for application master, exiting."
           + " Specified memory=" + amMemory);
     }
+    if (amVCores < 0) {
+      throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting."
+          + " Specified virtual cores=" + amVCores);
+    }
 
     if (!cliParser.hasOption("jar")) {
       throw new IllegalArgumentException("No jar file specified for application master");
@@ -306,11 +317,14 @@ public class Client {
     shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0"));
 
     containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
+    containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1"));
     numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
 
-    if (containerMemory < 0 || numContainers < 1) {
-      throw new IllegalArgumentException("Invalid no. of containers or container memory specified, exiting."
+    if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) {
+      throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified,"
+          + " exiting."
           + " Specified containerMemory=" + containerMemory
+          + ", containerVirtualCores=" + containerVirtualCores
           + ", numContainer=" + numContainers);
     }
 
@@ -383,6 +397,16 @@ public class Client {
       amMemory = maxMem;
     }				
 
+    int maxVCores = appResponse.getMaximumResourceCapability().getVirtualCores();
+    LOG.info("Max virtual cores capabililty of resources in this cluster " + maxVCores);
+    
+    if (amVCores > maxVCores) {
+      LOG.info("AM virtual cores specified above max threshold of cluster. " 
+          + "Using max value." + ", specified=" + amVCores 
+          + ", max=" + maxVCores);
+      amVCores = maxVCores;
+    }
+    
     // set the application name
     ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
     ApplicationId appId = appContext.getApplicationId();
@@ -514,6 +538,7 @@ public class Client {
     vargs.add(appMasterMainClass);
     // Set params for Application Master
     vargs.add("--container_memory " + String.valueOf(containerMemory));
+    vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
     vargs.add("--num_containers " + String.valueOf(numContainers));
     vargs.add("--priority " + String.valueOf(shellCmdPriority));
     if (!shellCommand.isEmpty()) {
@@ -544,9 +569,11 @@ public class Client {
     amContainer.setCommands(commands);
 
     // Set up resource type requirements
-    // For now, only memory is supported so we set memory requirements
+    // For now, both memory and vcores are supported, so we set memory and 
+    // vcores requirements
     Resource capability = Records.newRecord(Resource.class);
     capability.setMemory(amMemory);
+    capability.setVirtualCores(amVCores);
     appContext.setResource(capability);
 
     // Service data is a binary blob that can be passed to the application

+ 31 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java

@@ -105,7 +105,7 @@ public class TestDistributedShell {
       }
     }
   }
-
+  
   @Test(timeout=90000)
   public void testDSShell() throws Exception {
 
@@ -118,8 +118,12 @@ public class TestDistributedShell {
         Shell.WINDOWS ? "dir" : "ls",
         "--master_memory",
         "512",
+        "--master_vcores",
+        "2",
         "--container_memory",
-        "128"
+        "128",
+        "--container_vcores",
+        "1"
     };
 
     LOG.info("Initializing DS Client");
@@ -237,6 +241,31 @@ public class TestDistributedShell {
       Assert.assertTrue("The throw exception is not expected",
           e.getMessage().contains("Invalid no. of containers"));
     }
+    
+    LOG.info("Initializing DS Client with invalid no. of vcores");
+    try {
+      String[] args = {
+          "--jar",
+          APPMASTER_JAR,
+          "--num_containers",
+          "2",
+          "--shell_command",
+          Shell.WINDOWS ? "dir" : "ls",
+          "--master_memory",
+          "512",
+          "--master_vcores",
+          "-2",
+          "--container_memory",
+          "128",
+          "--container_vcores",
+          "1"
+      };
+      client.init(args);
+      Assert.fail("Exception is expected");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue("The throw exception is not expected",
+          e.getMessage().contains("Invalid virtual cores specified"));
+    }
   }
 
   protected static void waitForNMToRegister(NodeManager nm)