소스 검색

MAPREDUCE-6696. Add a configuration to limit the number of map tasks allowed per job. Contributed by Zhihai Xu

Jian He 9 년 전
부모
커밋
21d2b90213

+ 7 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java

@@ -200,6 +200,13 @@ class JobSubmitter {
       conf.setInt(MRJobConfig.NUM_MAPS, maps);
       conf.setInt(MRJobConfig.NUM_MAPS, maps);
       LOG.info("number of splits:" + maps);
       LOG.info("number of splits:" + maps);
 
 
+      int maxMaps = conf.getInt(MRJobConfig.JOB_MAX_MAP,
+          MRJobConfig.DEFAULT_JOB_MAX_MAP);
+      if (maxMaps >= 0 && maxMaps < maps) {
+        throw new IllegalArgumentException("The number of map tasks " + maps +
+            " exceeded limit " + maxMaps);
+      }
+
       // write "queue admins of the queue to which job is being submitted"
       // write "queue admins of the queue to which job is being submitted"
       // to job file.
       // to job file.
       String queue = conf.get(MRJobConfig.QUEUE_NAME,
       String queue = conf.get(MRJobConfig.QUEUE_NAME,

+ 7 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -425,6 +425,13 @@ public interface MRJobConfig {
       "mapreduce.job.running.reduce.limit";
       "mapreduce.job.running.reduce.limit";
   public static final int DEFAULT_JOB_RUNNING_REDUCE_LIMIT = 0;
   public static final int DEFAULT_JOB_RUNNING_REDUCE_LIMIT = 0;
 
 
+  /* Config for Limit on the number of map tasks allowed per job
+   * There is no limit if this value is negative.
+   */
+  public static final String JOB_MAX_MAP =
+      "mapreduce.job.max.map";
+  public static final int DEFAULT_JOB_MAX_MAP = -1;
+
   /* config for tracking the local file where all the credentials for the job
   /* config for tracking the local file where all the credentials for the job
    * credentials.
    * credentials.
    */
    */

+ 8 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

@@ -91,6 +91,14 @@
   </description>
   </description>
 </property>
 </property>
 
 
+<property>
+  <name>mapreduce.job.max.map</name>
+  <value>-1</value>
+  <description>Limit on the number of map tasks allowed per job.
+  There is no limit if this value is negative.
+  </description>
+</property>
+
   <property>
   <property>
     <name>mapreduce.job.reducer.preempt.delay.sec</name>
     <name>mapreduce.job.reducer.preempt.delay.sec</name>
     <value>0</value>
     <value>0</value>

+ 23 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java

@@ -57,7 +57,7 @@ public class TestLocalJobSubmission {
 
 
   /**
   /**
    * test the local job submission options of
    * test the local job submission options of
-   * -jt local -libjars
+   * -jt local -libjars.
    * @throws IOException
    * @throws IOException
    */
    */
   @Test
   @Test
@@ -106,6 +106,28 @@ public class TestLocalJobSubmission {
     assertEquals("dist job res is not 0:", 0, res);
     assertEquals("dist job res is not 0:", 0, res);
   }
   }
 
 
+  /**
+   * test JOB_MAX_MAP configuration.
+   * @throws Exception
+   */
+  @Test
+  public void testJobMaxMapConfig() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(MRConfig.FRAMEWORK_NAME, "local");
+    conf.setInt(MRJobConfig.JOB_MAX_MAP, 0);
+    final String[] args = {
+        "-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
+    };
+    int res = -1;
+    try {
+      res = ToolRunner.run(conf, new SleepJob(), args);
+      fail("Job should fail");
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getLocalizedMessage().contains(
+          "The number of map tasks 1 exceeded limit"));
+    }
+  }
+
   private Path makeJar(Path p) throws IOException {
   private Path makeJar(Path p) throws IOException {
     FileOutputStream fos = new FileOutputStream(new File(p.toString()));
     FileOutputStream fos = new FileOutputStream(new File(p.toString()));
     JarOutputStream jos = new JarOutputStream(fos);
     JarOutputStream jos = new JarOutputStream(fos);