Browse Source

Fix for HADOOP-142. Avoid re-running a task on a node where it has previously failed. Contributed by Owen.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@395069 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 years ago
parent
commit
e504563818

+ 4 - 1
CHANGES.txt

@@ -54,10 +54,13 @@ Trunk (unreleased)
 
 15. Fix HADOOP-115.  Correct an error message.  (Stack via cutting)
 
-16. "Fix HADOOP-133.  Retry pings from child to parent, in case of
+16. Fix HADOOP-133.  Retry pings from child to parent, in case of
     (local) communcation problems.  Also log exit status, so that one
     can distinguish patricide from other deaths.  (omalley via cutting)
 
+17. Fix HADOOP-142.  Avoid re-running a task on a host where it has
+    previously failed.  (omalley via cutting)
+
 
 Release 0.1.1 - 2006-04-08
 

+ 54 - 15
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -50,6 +50,8 @@ class JobInProgress {
     long finishTime;
 
     private JobConf conf;
+    private int firstMapToTry = 0;
+    private int firstReduceToTry = 0;
     boolean tasksInited = false;
 
     private LocalFileSystem localFs;
@@ -139,7 +141,8 @@ class JobInProgress {
         // create a map task for each split
         this.maps = new TaskInProgress[numMapTasks];
         for (int i = 0; i < numMapTasks; i++) {
-            maps[i] = new TaskInProgress(jobFile, splits[i], jobtracker, conf, this);
+            maps[i] = new TaskInProgress(jobFile, splits[i], jobtracker, conf, 
+                                         this, i);
         }
 
         //
@@ -278,6 +281,7 @@ class JobInProgress {
         int cacheTarget = -1;
         int stdTarget = -1;
         int specTarget = -1;
+        int failedTarget = -1;
 
         //
         // We end up creating two tasks for the same bucket, because
@@ -296,10 +300,17 @@ class JobInProgress {
         // doesn't have to be transmitted from another node.
         //
         for (int i = 0; i < maps.length; i++) {
-            if (maps[i].hasTaskWithCacheHit(taskTracker, tts)) {
+            int realIdx = (i + firstMapToTry) % maps.length; 
+            if (maps[realIdx].hasTaskWithCacheHit(taskTracker, tts)) {
                 if (cacheTarget < 0) {
-                    cacheTarget = i;
+                  if (maps[realIdx].hasFailedOnMachine(taskTracker)) {
+                    if (failedTarget < 0) {
+                      failedTarget = realIdx;
+                    }
+                  } else {
+                    cacheTarget = realIdx;
                     break;
+                  }
                 }
             }
         }
@@ -310,10 +321,17 @@ class JobInProgress {
         //
         if (cacheTarget < 0) {
             for (int i = 0; i < maps.length; i++) {
-                if (maps[i].hasTask()) {
+                int realIdx = (i + firstMapToTry) % maps.length; 
+                if (maps[realIdx].hasTask()) {
                     if (stdTarget < 0) {
-                        stdTarget = i;
+                      if (maps[realIdx].hasFailedOnMachine(taskTracker)) {
+                        if (failedTarget < 0) {
+                          failedTarget = realIdx;
+                        }
+                      } else {
+                        stdTarget = realIdx;
                         break;
+                      }
                     }
                 }
             }
@@ -325,11 +343,12 @@ class JobInProgress {
         //
         if (cacheTarget < 0 && stdTarget < 0) {
             for (int i = 0; i < maps.length; i++) {        
-                if (maps[i].hasSpeculativeTask(avgProgress)) {
-                    if (specTarget < 0) {
-                        specTarget = i;
+                int realIdx = (i + firstMapToTry) % maps.length; 
+                if (maps[realIdx].hasSpeculativeTask(avgProgress)) {
+                      if (!maps[realIdx].hasFailedOnMachine(taskTracker)) {
+                        specTarget = realIdx;
                         break;
-                    }
+                      }
                 }
             }
         }
@@ -343,6 +362,8 @@ class JobInProgress {
             t = maps[stdTarget].getTaskToRun(taskTracker, tts, avgProgress);
         } else if (specTarget >= 0) {
             t = maps[specTarget].getTaskToRun(taskTracker, tts, avgProgress);
+        } else if (failedTarget >= 0) {
+            t = maps[failedTarget].getTaskToRun(taskTracker, tts, avgProgress);
         }
         return t;
     }
@@ -361,16 +382,23 @@ class JobInProgress {
         Task t = null;
         int stdTarget = -1;
         int specTarget = -1;
+        int failedTarget = -1;
         double avgProgress = status.reduceProgress() / reduces.length;
 
         for (int i = 0; i < reduces.length; i++) {
-            if (reduces[i].hasTask()) {
-                if (stdTarget < 0) {
-                    stdTarget = i;
+            int realIdx = (i + firstReduceToTry) % reduces.length;
+            if (reduces[realIdx].hasTask()) {
+                if (reduces[realIdx].hasFailedOnMachine(taskTracker)) {
+                  if (failedTarget < 0) {
+                    failedTarget = realIdx;
+                  }
+                } else if (stdTarget < 0) {
+                    stdTarget = realIdx;
                 }
-            } else if (reduces[i].hasSpeculativeTask(avgProgress)) {
-                if (specTarget < 0) {
-                    specTarget = i;
+            } else if (reduces[realIdx].hasSpeculativeTask(avgProgress)) {
+                if (specTarget < 0 &&
+                    !reduces[realIdx].hasFailedOnMachine(taskTracker)) {
+                    specTarget = realIdx;
                 }
             }
         }
@@ -379,6 +407,9 @@ class JobInProgress {
             t = reduces[stdTarget].getTaskToRun(taskTracker, tts, avgProgress);
         } else if (specTarget >= 0) {
             t = reduces[specTarget].getTaskToRun(taskTracker, tts, avgProgress);
+        } else if (failedTarget >= 0) {
+            t = reduces[failedTarget].getTaskToRun(taskTracker, tts, 
+                                                   avgProgress);
         }
         return t;
     }
@@ -455,6 +486,14 @@ class JobInProgress {
      */
     public void failedTask(TaskInProgress tip, String taskid, String trackerName) {
         tip.failedSubTask(taskid, trackerName);
+        
+        // After this, try to assign tasks with the one after this, so that
+        // the failed task goes to the end of the list.
+        if (tip.isMapTask()) {
+          firstMapToTry = (tip.getIdWithinJob() + 1) % maps.length;
+        } else {
+          firstReduceToTry = (tip.getIdWithinJob() + 1) % reduces.length;
+        }
             
         //
         // Check if we need to kill the job because of too many failures

+ 24 - 2
src/java/org/apache/hadoop/mapred/TaskInProgress.java

@@ -77,19 +77,24 @@ class TaskInProgress {
     /**
      * Constructor for MapTask
      */
-    public TaskInProgress(String jobFile, FileSplit split, JobTracker jobtracker, JobConf conf, JobInProgress job) {
+    public TaskInProgress(String jobFile, FileSplit split, 
+                          JobTracker jobtracker, JobConf conf, 
+                          JobInProgress job, int partition) {
         this.jobFile = jobFile;
         this.split = split;
         this.jobtracker = jobtracker;
         this.job = job;
         this.conf = conf;
+        this.partition = partition;
         init();
     }
         
     /**
      * Constructor for ReduceTask
      */
-    public TaskInProgress(String jobFile, TaskInProgress predecessors[], int partition, JobTracker jobtracker, JobConf conf, JobInProgress job) {
+    public TaskInProgress(String jobFile, TaskInProgress predecessors[], 
+                          int partition, JobTracker jobtracker, JobConf conf,
+                          JobInProgress job) {
         this.jobFile = jobFile;
         this.predecessors = predecessors;
         this.partition = partition;
@@ -456,4 +461,21 @@ class TaskInProgress {
         }
         return t;
     }
+    
+    /**
+     * Has this task already failed on this machine?
+     * @param tracker The task tracker name
+     * @return Has it failed?
+     */
+    public boolean hasFailedOnMachine(String tracker) {
+      return machinesWhereFailed.contains(tracker);
+    }
+    
+    /**
+     * Get the id of this map or reduce task.
+     * @return The index of this tip in the maps/reduces lists.
+     */
+    public int getIdWithinJob() {
+      return partition;
+    }
 }