Jelajahi Sumber

HADOOP-578. Failed tasks are no longer placed at the end of the task queue. Contributed by Sanjay.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@468127 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 tahun lalu
induk
melakukan
80626a2013
2 mengubah file dengan 12 tambahan dan 13 penghapusan
  1. 5 0
      CHANGES.txt
  2. 7 13
      src/java/org/apache/hadoop/mapred/JobInProgress.java

+ 5 - 0
CHANGES.txt

@@ -75,6 +75,11 @@ Trunk (unreleased changes)
 20. HADOOP-624.  Fix servlet path to stop a Jetty warning on startup.
     (omalley via cutting)
 
+21. HADOOP-578.  Failed tasks are no longer placed at the end of the
+    task queue.  This was originally done to work around other
+    problems that have now been fixed.  Re-executing failed tasks
+    sooner causes buggy jobs to fail faster.  (Sanjay Dahiya via cutting)
+
 
 Release 0.7.2 - 2006-10-18
 

+ 7 - 13
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -56,8 +56,6 @@ class JobInProgress {
     long finishTime;
 
     private JobConf conf;
-    private int firstMapToTry = 0;
-    private int firstReduceToTry = 0;
     boolean tasksInited = false;
 
     private LocalFileSystem localFs;
@@ -330,7 +328,7 @@ class JobInProgress {
       ArrayList mapCache = (ArrayList)hostToMaps.get(tts.getHost());
       double avgProgress = status.mapProgress() / maps.length;
       int target = findNewTask(tts, clusterSize, avgProgress, 
-                                  maps, firstMapToTry, mapCache);
+                                  maps, mapCache);
       if (target == -1) {
         return null;
       }
@@ -359,7 +357,7 @@ class JobInProgress {
 
         double avgProgress = status.reduceProgress() / reduces.length;
         int target = findNewTask(tts, clusterSize, avgProgress, 
-                                    reduces, firstReduceToTry, null);
+                                    reduces, null);
         if (target == -1) {
           return null;
         }
@@ -388,7 +386,6 @@ class JobInProgress {
                             int clusterSize,
                             double avgProgress,
                             TaskInProgress[] tasks,
-                            int firstTaskToTry,
                             List cachedTasks) {
         String taskTracker = tts.getTrackerName();
         //
@@ -419,8 +416,7 @@ class JobInProgress {
         int failedTarget = -1;
         int specTarget = -1;
         for (int i = 0; i < tasks.length; i++) {
-          int realIdx = (i + firstTaskToTry) % tasks.length; 
-          TaskInProgress task = tasks[realIdx];
+          TaskInProgress task = tasks[i];
           if (task.isRunnable()) {
             // if it failed here and we haven't tried every machine, we
             // don't schedule it here.
@@ -433,15 +429,15 @@ class JobInProgress {
               // failed tasks that aren't running can be scheduled as a last
               // resort
               if (!isRunning && failedTarget == -1) {
-                failedTarget = realIdx;
+                failedTarget = i;
               }
             } else {
               if (!isRunning) {
-                LOG.info("Choosing normal task " + tasks[realIdx].getTIPId());
-                return realIdx;
+                LOG.info("Choosing normal task " + tasks[i].getTIPId());
+                return i;
               } else if (specTarget == -1 &&
                          task.hasSpeculativeTask(avgProgress)) {
-                specTarget = realIdx;
+                specTarget = i;
               }
             }
           }
@@ -625,10 +621,8 @@ class JobInProgress {
         // After this, try to assign tasks with the one after this, so that
         // the failed task goes to the end of the list.
         if (tip.isMapTask()) {
-          firstMapToTry = (tip.getIdWithinJob() + 1) % maps.length;
           failedMapTasks++; 
         } else {
-          firstReduceToTry = (tip.getIdWithinJob() + 1) % reduces.length;
           failedReduceTasks++; 
         }