Browse Source

HADOOP-1304. Make configurable the maximum number of task attempts before a job fails. Contributed by Devaraj.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@534606 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 years ago
parent
commit
dd7c58b39e

+ 3 - 0
CHANGES.txt

@@ -306,6 +306,9 @@ Trunk (unreleased changes)
     the new format.  Please backup your data first before upgrading 
     the new format.  Please backup your data first before upgrading 
     (using 'hadoop distcp' for example).  (tomwhite)
     (using 'hadoop distcp' for example).  (tomwhite)
 
 
+91. HADOOP-1304.  Make configurable the maximum number of task
+    attempts before a job fails.  (Devaraj Das via cutting)
+
 
 
 Release 0.12.3 - 2007-04-06
 Release 0.12.3 - 2007-04-06
 
 

+ 18 - 0
conf/hadoop-default.xml

@@ -543,6 +543,24 @@ creations/deletions), or "all".</description>
   </description>
   </description>
 </property>
 </property>
 
 
+<property>
+  <name>mapred.map.max.attempts</name>
+  <value>4</value>
+  <description>Expert: The maximum number of attempts per map task.
+  In other words, framework will try to execute a map task these many number
+  of times before giving up on it.
+  </description>
+</property>
+
+<property>
+  <name>mapred.reduce.max.attempts</name>
+  <value>4</value>
+  <description>Expert: The maximum number of attempts per reduce task.
+  In other words, framework will try to execute a reduce task these many number
+  of times before giving up on it.
+  </description>
+</property>
+
 <property>
 <property>
   <name>mapred.reduce.parallel.copies</name>
   <name>mapred.reduce.parallel.copies</name>
   <value>5</value>
   <value>5</value>

+ 34 - 0
src/java/org/apache/hadoop/mapred/JobConf.java

@@ -510,7 +510,41 @@ public class JobConf extends Configuration {
 
 
   public int getNumReduceTasks() { return getInt("mapred.reduce.tasks", 1); }
   public int getNumReduceTasks() { return getInt("mapred.reduce.tasks", 1); }
   public void setNumReduceTasks(int n) { setInt("mapred.reduce.tasks", n); }
   public void setNumReduceTasks(int n) { setInt("mapred.reduce.tasks", n); }
+  
+  /** Get the configured number of maximum attempts that will be made to run a
+   *  map task, as specified by the <code>mapred.map.max.attempts</code>
+   *  property. If this property is not already set, the default is 4 attempts
+   * @return the max number of attempts
+   */
+  public int getMaxMapAttempts() {
+    return getInt("mapred.map.max.attempts", 4);
+  }
+  /** Expert: Set the number of maximum attempts that will be made to run a
+   *  map task
+   * @param n the number of attempts
+   *
+   */
+  public void setMaxMapAttempts(int n) {
+    setInt("mapred.map.max.attempts", n);
+  }
 
 
+  /** Get the configured number of maximum attempts  that will be made to run a
+   *  reduce task, as specified by the <code>mapred.reduce.max.attempts</code>
+   *  property. If this property is not already set, the default is 4 attempts
+   * @return the max number of attempts
+   */
+  public int getMaxReduceAttempts() {
+    return getInt("mapred.reduce.max.attempts", 4);
+  }
+  /** Expert: Set the number of maximum attempts that will be made to run a
+   *  reduce task
+   * @param n the number of attempts
+   *
+   */
+  public void setMaxReduceAttempts(int n) {
+    setInt("mapred.reduce.max.attempts", n);
+  }
+  
   /**
   /**
    * Get the user-specified job name. This is only used to identify the 
    * Get the user-specified job name. This is only used to identify the 
    * job to the user.
    * job to the user.

+ 16 - 4
src/java/org/apache/hadoop/mapred/TaskInProgress.java

@@ -49,7 +49,7 @@ import org.apache.hadoop.util.StringUtils;
 ////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////
 class TaskInProgress {
 class TaskInProgress {
   static final int MAX_TASK_EXECS = 1;
   static final int MAX_TASK_EXECS = 1;
-  static final int MAX_TASK_FAILURES = 4;    
+  int maxTaskAttempts = 4;    
   static final double SPECULATIVE_GAP = 0.2;
   static final double SPECULATIVE_GAP = 0.2;
   static final long SPECULATIVE_LAG = 60 * 1000;
   static final long SPECULATIVE_LAG = 60 * 1000;
   private static NumberFormat idFormat = NumberFormat.getInstance();
   private static NumberFormat idFormat = NumberFormat.getInstance();
@@ -125,6 +125,7 @@ class TaskInProgress {
     this.job = job;
     this.job = job;
     this.conf = conf;
     this.conf = conf;
     this.partition = partition;
     this.partition = partition;
+    setMaxTaskAttempts();
     init(uniqueString);
     init(uniqueString);
   }
   }
         
         
@@ -141,8 +142,19 @@ class TaskInProgress {
     this.jobtracker = jobtracker;
     this.jobtracker = jobtracker;
     this.job = job;
     this.job = job;
     this.conf = conf;
     this.conf = conf;
+    setMaxTaskAttempts();
     init(uniqueString);
     init(uniqueString);
   }
   }
+  /**
+   * Set the max number of attempts before we declare a TIP as "failed"
+   */
+  private void setMaxTaskAttempts() {
+    if (isMapTask()) {
+      this.maxTaskAttempts = conf.getMaxMapAttempts();
+    } else {
+      this.maxTaskAttempts = conf.getMaxReduceAttempts();
+    }
+  }
 
 
   /**
   /**
    * Make a unique name for this TIP.
    * Make a unique name for this TIP.
@@ -430,7 +442,7 @@ class TaskInProgress {
       numKilledTasks++;
       numKilledTasks++;
     }
     }
 
 
-    if (numTaskFailures >= MAX_TASK_FAILURES) {
+    if (numTaskFailures >= maxTaskAttempts) {
       LOG.info("TaskInProgress " + getTIPId() + " has failed " + numTaskFailures + " times.");
       LOG.info("TaskInProgress " + getTIPId() + " has failed " + numTaskFailures + " times.");
       kill();
       kill();
     }
     }
@@ -620,11 +632,11 @@ class TaskInProgress {
 
 
     // Create the 'taskid'; do not count the 'killed' tasks against the job!
     // Create the 'taskid'; do not count the 'killed' tasks against the job!
     String taskid = null;
     String taskid = null;
-    if (nextTaskId < (MAX_TASK_EXECS + MAX_TASK_FAILURES + numKilledTasks)) {
+    if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts + numKilledTasks)) {
       taskid = new String("task_" + taskIdPrefix + "_" + nextTaskId);
       taskid = new String("task_" + taskIdPrefix + "_" + nextTaskId);
       ++nextTaskId;
       ++nextTaskId;
     } else {
     } else {
-      LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + MAX_TASK_FAILURES) +
+      LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts) +
               " (plus " + numKilledTasks + " killed)"  + 
               " (plus " + numKilledTasks + " killed)"  + 
               " attempts for the tip '" + getTIPId() + "'");
               " attempts for the tip '" + getTIPId() + "'");
       return null;
       return null;