Ver código fonte

HADOOP-4018. The number of tasks for a single job cannot exceed a
pre-configured maximum value. (dhruba)



git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.19@703294 13f79535-47bb-0310-9956-ffa450edef68

Dhruba Borthakur 17 anos atrás
pai
commit
d0ed36d363

+ 3 - 0
CHANGES.txt

@@ -819,6 +819,9 @@ Release 0.19.0 - Unreleased
     HADOOP-4267. Occasional exceptions during shutting down HSQLDB is logged 
     but not rethrown. (enis) 
 
+    HADOOP-4018. The number of tasks for a single job cannot exceed a 
+    pre-configured maximum value. (dhruba)
+
 Release 0.18.2 - Unreleased
 
   BUG FIXES

+ 7 - 0
conf/hadoop-default.xml

@@ -967,6 +967,13 @@ creations/deletions), or "all".</description>
   take priority over this setting.</description>
 </property>
 
+<property>
+  <name>mapred.jobtracker.maxtasks.per.job</name>
+  <value>-1</value>
+  <description>The maximum number of tasks for a single job.
+  A value of -1 indicates that there is no maximum.  </description>
+</property>
+
 <property>
   <name>mapred.submit.replication</name>
   <value>10</value>

+ 13 - 1
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -52,7 +52,7 @@ import org.apache.hadoop.net.Node;
  * ***********************************************************
  */
 class JobInProgress {
-  private static final Log LOG = LogFactory.getLog(JobInProgress.class);
+  static final Log LOG = LogFactory.getLog(JobInProgress.class);
     
   JobProfile profile;
   JobStatus status;
@@ -363,6 +363,18 @@ class JobInProgress {
       splitFile.close();
     }
     numMapTasks = splits.length;
+
+
+    // if the number of splits is larger than a configured value
+    // then fail the job.
+    int maxTasks = jobtracker.getMaxTasksPerJob();
+    if (maxTasks > 0 && numMapTasks + numReduceTasks > maxTasks) {
+      throw new IOException(
+                "The number of tasks for this job " + 
+                (numMapTasks + numReduceTasks) +
+                " exceeds the configured limit " + maxTasks);
+    }
+
     maps = new TaskInProgress[numMapTasks];
     for(int i=0; i < numMapTasks; ++i) {
       inputLength += splits[i].getDataLength();

+ 7 - 0
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -2721,5 +2721,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     return (JobStatus[]) jobStatusList.toArray(
         new JobStatus[jobStatusList.size()]);
   }
+
+  /**
+   * Returns the confgiured maximum number of tasks for a single job
+   */
+  int getMaxTasksPerJob() {
+    return conf.getInt("mapred.jobtracker.maxtasks.per.job", -1);
+  }
   
 }

+ 117 - 0
src/test/org/apache/hadoop/mapred/TestTaskLimits.java

@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import junit.framework.TestCase;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
+
+/**
+ * A JUnit test to test configired task limits.
+ */
+public class TestTaskLimits extends TestCase {
+
+  {     
+    ((Log4JLogger)JobInProgress.LOG).getLogger().setLevel(Level.ALL);
+  }     
+
+  private static final Log LOG =
+    LogFactory.getLog(TestMiniMRWithDFS.class.getName());
+  
+  static final int NUM_MAPS = 5;
+  static final int NUM_SAMPLES = 100;
+  
+  public static class TestResult {
+    public String output;
+    public RunningJob job;
+    TestResult(RunningJob job, String output) {
+      this.job = job;
+      this.output = output;
+    }
+  }
+  
+  static void runPI(MiniMRCluster mr, JobConf jobconf) throws IOException {
+    LOG.info("runPI");
+    double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, jobconf);
+    double error = Math.abs(Math.PI - estimate);
+    System.out.println("PI estimation " + error);
+  }
+
+  /**
+   * Run the pi test with a specifix value of 
+   * mapred.jobtracker.maxtasks.per.job. Returns true if the job succeeded.
+   */
+  private boolean runOneTest(int maxTasks) throws IOException {
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fileSys = null;
+    boolean success = false;
+    try {
+      final int taskTrackers = 2;
+
+      Configuration conf = new Configuration();
+      conf.setInt("mapred.jobtracker.maxtasks.per.job", maxTasks);
+      dfs = new MiniDFSCluster(conf, 4, true, null);
+      fileSys = dfs.getFileSystem();
+      JobConf jconf = new JobConf(conf);
+      mr = new MiniMRCluster(0, 0, taskTrackers, fileSys.getUri().toString(), 1,
+                             null, null, null, jconf);
+      
+      JobConf jc = mr.createJobConf();
+      try {
+        runPI(mr, jc);
+        success = true;
+      } catch (IOException e) {
+        success = false;
+      }
+    } finally {
+      if (dfs != null) { dfs.shutdown(); }
+      if (mr != null) { mr.shutdown(); }
+    }
+    return success;
+  }
+
+  public void testTaskLimits() throws IOException {
+
+    System.out.println("Job 1 running with max set to 2");
+    boolean status = runOneTest(2);
+    assertTrue(status == false);
+    System.out.println("Job 1 failed as expected.");
+
+    // verify that checking this limit works well. The job
+    // needs 5 mappers and we set the limit to 7.
+    System.out.println("Job 2 running with max set to 7.");
+    status = runOneTest(7);
+    assertTrue(status == true);
+    System.out.println("Job 2 succeeded as expected.");
+
+    System.out.println("Job 3 running with max disabled.");
+    status = runOneTest(-1);
+    assertTrue(status == true);
+    System.out.println("Job 3 succeeded as expected.");
+  }
+}