Przeglądaj źródła

HADOOP-741. Fix some issues with speculative execution. Contributed by Sanjay.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@478358 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 lat temu
rodzic
commit
a0a06dd628

+ 4 - 7
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -328,8 +328,7 @@ class JobInProgress {
         return null;
       }
       ArrayList mapCache = (ArrayList)hostToMaps.get(tts.getHost());
-      double avgProgress = status.mapProgress() / maps.length;
-      int target = findNewTask(tts, clusterSize, avgProgress, 
+      int target = findNewTask(tts, clusterSize, status.mapProgress(), 
                                   maps, mapCache);
       if (target == -1) {
         return null;
@@ -357,8 +356,7 @@ class JobInProgress {
             return null;
         }
 
-        double avgProgress = status.reduceProgress() ;
-        int target = findNewTask(tts, clusterSize, avgProgress, 
+        int target = findNewTask(tts, clusterSize, status.reduceProgress() , 
                                     reduces, null);
         if (target == -1) {
           return null;
@@ -441,7 +439,6 @@ class JobInProgress {
                          task.hasSpeculativeTask(avgProgress) && 
                          ! task.hasRunOnMachine(taskTracker)) {
                 specTarget = i;
-                break ;
               }
             }
           }
@@ -696,8 +693,8 @@ class JobInProgress {
         
         // Delete temp dfs dirs created if any, like in case of 
         // speculative exn of reduces.  
-     //   String tempDir = conf.get("mapred.system.dir") + "/job_" + uniqueString; 
-     //   fs.delete(new Path(tempDir)); 
+        String tempDir = conf.get("mapred.system.dir") + "/job_" + uniqueString; 
+        fs.delete(new Path(tempDir)); 
 
       } catch (IOException e) {
         LOG.warn("Error cleaning up "+profile.getJobId()+": "+e);

+ 6 - 14
src/java/org/apache/hadoop/mapred/TaskInProgress.java

@@ -42,7 +42,6 @@ class TaskInProgress {
     static final int MAX_TASK_FAILURES = 4;    
     static final double SPECULATIVE_GAP = 0.2;
     static final long SPECULATIVE_LAG = 60 * 1000;
-    static final int MAX_CONCURRENT_TASKS = 2; 
     private static NumberFormat idFormat = NumberFormat.getInstance();
     static {
       idFormat.setMinimumIntegerDigits(6);
@@ -445,21 +444,14 @@ class TaskInProgress {
         // REMIND - mjc - these constants should be examined
         // in more depth eventually...
         //
-        if (isMapTask() &&
-            activeTasks.size() <= MAX_TASK_EXECS &&
+      
+      if( activeTasks.size() <= MAX_TASK_EXECS &&
             runSpeculative &&
             (averageProgress - progress >= SPECULATIVE_GAP) &&
-            (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG)) {
-            return true;
-        }else{
-          //Note: validate criteria for speculative reduce execution
-          if( runSpeculative && (activeTasks.size() < MAX_CONCURRENT_TASKS ) && 
-              (averageProgress - progress >= SPECULATIVE_GAP) &&
-              completes <= 0 &&
-              (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG)) {
-            return true ; 
-          }
-        }
+            (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG) 
+            && completes == 0) {
+          return true;
+      }
         return false;
     }
     

+ 11 - 20
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -1026,6 +1026,17 @@ public class TaskTracker
             }
             if (keepJobFiles)
               return;
+            
+            // Delete temp directory in case any task used PhasedFileSystem.
+            try{
+              String systemDir = task.getConf().get("mapred.system.dir");
+              String taskTempDir = systemDir + "/" + 
+                  task.getJobId() + "/" + task.getTipId();
+              fs.delete(new Path(taskTempDir)) ;
+            }catch(IOException e){
+              LOG.warn("Error in deleting reduce temporary output",e); 
+            }
+            
             // delete the job diretory for this task 
             // since the job is done/failed
             this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + 
@@ -1053,26 +1064,6 @@ public class TaskTracker
                 runstate = TaskStatus.State.KILLED;
               }
             }
-            
-            // the temporary file names in speculative exn are generated in 
-            // the launched JVM, and we dont talk to it when killing so cleanup
-            // should happen here. find the task id and delete the temp directory 
-            // for the task. only for killed speculative reduce instances
-            
-            // Note: it would be better to couple this with delete localfiles
-            // which is in conf currently, it doenst belong there. 
-
-            if( !task.isMapTask() && 
-                this.defaultJobConf.getSpeculativeExecution() ){
-              try{
-                String systemDir = task.getConf().get("mapred.system.dir");
-                String taskTempDir = systemDir + "/" + 
-                    task.getJobId() + "/" + task.getTipId();
-                fs.delete(new Path(taskTempDir)) ;
-              }catch(IOException e){
-                LOG.warn("Error in deleting reduce temporary output",e); 
-              }
-            }
         }
 
         /**